Module amnes.exec.controller.project_execution
This module contains classes and functions for AMNES project execution.
Classes
ProjectExecutionManager: Manager class for executing an entire AMNES Project.
Expand source code
"""This module contains classes and functions for AMNES project execution.
Classes:
ProjectExecutionManager: Manager class for executing an entire AMNES Project.
"""
from logging import Logger
from typing import TYPE_CHECKING, Dict, Iterator, List, Tuple
import Pyro5.api as Pyro5
from Pyro5.errors import PyroError
from ...core.amnes_project import AmnesProject
from ...core.experiment import ConcreteExperiment, ExperimentState
from ...core.experiment_sequence import ExperimentSequence
from ...core.worker_endpoint import WorkerEndpoint
from ...data.manager.storage_backend import ConcreteExperimentReference
from ..logging import InstanceLogging
from ..worker.app import RemoteWorkerManager
from .experiment_execution import ExperimentExecutionManager
from .resultmanager import ExperimentReference
if TYPE_CHECKING:
from .app import Controller # pylint: disable=cyclic-import
class ProjectExecutionManager(InstanceLogging):
"""Manager class for executing an entire AMNES Project."""
LOGID = "projectexecutionmanager"
DEFAULT_TIMEOUT = 20
def __init__(
self, logger: Logger, project: AmnesProject, controller: "Controller"
) -> None:
"""Project execution manager constructor method.
Args:
logger (Logger): Logger for object instance.
project (AmnesProject): Project to be executed.
controller (Controller): Controller which started the project execution.
"""
InstanceLogging.__init__(self, logger)
self.__project: AmnesProject = project
self.__controller: "Controller" = controller
self.__workers: Dict[str, Tuple[WorkerEndpoint, RemoteWorkerManager]] = {}
def run(self) -> None:
"""Run project execution."""
self.logger.info(
f"Execution of AMNES Project '{self.__project.slug}' started ..."
)
self.logger.info("Moving CREATED experiments to PENDING state ...")
self.logger.info(
f"{self.__schedule_experiments()} experiments were moved to PENDING state."
)
self.logger.info("Initializing remote worker managers for every node ...")
self.logger.info(
f"{self.__init_managers()} remote worker managers were initialized."
)
self.logger.info("Checking for availability of all workers ...")
unavailable = self.__check_managers()
if unavailable:
errstr = "Following workers are not available:\n"
for endpoint in unavailable:
errstr = errstr + f"~ '{endpoint.address}:{endpoint.port}'\n"
self.logger.error(errstr)
return
self.logger.info("All workers available.")
self.logger.info("Starting experiment executions ...")
for sequence, experiment, repetition in self.__pending_experiments():
ref = ExperimentReference(
concrete_experiment_reference=ConcreteExperimentReference(
concrete_experiment_slug=experiment.slug,
sequence_parameter_set_slug=sequence.pset.slug,
amnes_project_slug=self.__project.slug,
),
repetition=repetition,
)
self.__controller.current_experiment = ref
self.logger.info(
f"Initiate execution of '"
+ f"{sequence.pset.slug}"
+ f"${experiment.slug}"
+ f"${repetition}"
+ "' ..."
)
self.__update_experiment_state(
sequence, experiment, repetition, ExperimentState.RUNNING
)
result = ExperimentExecutionManager(
self.__controller.logger.getChild(ExperimentExecutionManager.LOGID),
self.__controller,
experiment,
repetition,
).run()
self.__update_experiment_state(sequence, experiment, repetition, result)
self.logger.info(
f"Execution finished for '"
+ f"{sequence.pset.slug}"
+ f"${experiment.slug}"
+ f"${repetition}"
+ f"' with result {result}"
+ "."
)
self.__controller.current_experiment = None
self.logger.info("All pending experiments were processed.")
self.logger.info(
f"Execution of AMNES Project '{self.__project.slug}' finished."
)
def __update_experiment_state(
self,
sequence: ExperimentSequence,
experiment: ConcreteExperiment,
repetition: int,
state: ExperimentState,
) -> None:
"""Update experiment state of specific repetition.
Args:
sequence (ExperimentSequence): Sequences to which the experiment belongs to.
experiment (ConcreteExperiment): Experiment reference to be updated.
repetition (int): Repetition whoes state should be updated.
state (ExperimentState): New state of the repetition which should be
updated.
Raises:
ValueError: If storage backend is not available.
"""
experiment.set_state(repetition, state)
if self.__controller.storage:
self.__controller.storage.update_experiment_states(
{repetition: state}.items(),
ConcreteExperimentReference(
concrete_experiment_slug=experiment.slug,
sequence_parameter_set_slug=sequence.pset.slug,
amnes_project_slug=self.__project.slug,
),
)
else:
self.logger.error("No Storage Backend available!")
raise ValueError("No Storage Backend available!")
def __schedule_experiments(self) -> int:
"""Set all CREATED experiments of project to PENDING state.
Returns:
int: Number of experiments set to PENDING state.
Raises:
ValueError: If storage backend is not available.
"""
counter: int = 0
for sequence in self.__project.sequences:
for experiment in sequence.experiments:
for rep_key, rep_value in experiment.states:
if rep_value == ExperimentState.CREATED:
experiment.set_state(rep_key, ExperimentState.PENDING)
counter = counter + 1
if self.__controller.storage:
self.__controller.storage.update_experiment_states(
experiment.states,
ConcreteExperimentReference(
concrete_experiment_slug=experiment.slug,
sequence_parameter_set_slug=sequence.pset.slug,
amnes_project_slug=self.__project.slug,
),
)
else:
self.logger.error("No Storage Backend available!")
raise ValueError("No Storage Backend available!")
return counter
def __init_managers(self) -> int:
"""Initialize remote worker manager for every node.
Returns:
int: Number of remote worker managers initialized.
"""
counter: int = 0
for node in self.__project.template.nodes:
endpoint = node.endpoint
manager = Pyro5.Proxy(
f"PYRO:{RemoteWorkerManager.PYROID}@{endpoint.address}:{endpoint.port}"
)
manager._pyroTimeout = ( # pylint: disable=protected-access
ProjectExecutionManager.DEFAULT_TIMEOUT
)
self.__workers[node.slug] = (endpoint, manager)
counter = counter + 1
return counter
def __check_managers(self) -> List[WorkerEndpoint]:
"""Checks all remote managers for availability.
Returns:
List[WorkerEndpoint]: List of all WorkerEndpoints which did not respond.
"""
unavailable: List[WorkerEndpoint] = []
for _, (endpoint, manager) in self.__workers.items():
try:
manager.ping()
except PyroError:
unavailable.append(endpoint)
return unavailable
def __pending_experiments(
self,
) -> Iterator[Tuple[ExperimentSequence, ConcreteExperiment, int]]:
"""Generator for all pending experiments of AMNES Project.
Yields:
ExperimentSequence: Sequence of next pending experiment.
ConcreteExperiment: Instance of next pending experiment.
int: Repetition of next pending experiment.
"""
done: bool = False
while not done:
done = True
for sequence in self.__project.sequences:
for experiment in sequence.experiments:
for repetition, state in experiment.states:
if state == ExperimentState.PENDING:
done = False
yield (sequence, experiment, repetition)
Classes
class ProjectExecutionManager (logger: logging.Logger, project: AmnesProject, controller: Controller)
-
Manager class for executing an entire AMNES Project.
Project execution manager constructor method.
Args
logger
:Logger
- Logger for object instance.
project
:AmnesProject
- Project to be executed.
controller
:Controller
- Controller which started the project execution.
Expand source code
class ProjectExecutionManager(InstanceLogging): """Manager class for executing an entire AMNES Project.""" LOGID = "projectexecutionmanager" DEFAULT_TIMEOUT = 20 def __init__( self, logger: Logger, project: AmnesProject, controller: "Controller" ) -> None: """Project execution manager constructor method. Args: logger (Logger): Logger for object instance. project (AmnesProject): Project to be executed. controller (Controller): Controller which started the project execution. """ InstanceLogging.__init__(self, logger) self.__project: AmnesProject = project self.__controller: "Controller" = controller self.__workers: Dict[str, Tuple[WorkerEndpoint, RemoteWorkerManager]] = {} def run(self) -> None: """Run project execution.""" self.logger.info( f"Execution of AMNES Project '{self.__project.slug}' started ..." ) self.logger.info("Moving CREATED experiments to PENDING state ...") self.logger.info( f"{self.__schedule_experiments()} experiments were moved to PENDING state." ) self.logger.info("Initializing remote worker managers for every node ...") self.logger.info( f"{self.__init_managers()} remote worker managers were initialized." ) self.logger.info("Checking for availability of all workers ...") unavailable = self.__check_managers() if unavailable: errstr = "Following workers are not available:\n" for endpoint in unavailable: errstr = errstr + f"~ '{endpoint.address}:{endpoint.port}'\n" self.logger.error(errstr) return self.logger.info("All workers available.") self.logger.info("Starting experiment executions ...") for sequence, experiment, repetition in self.__pending_experiments(): ref = ExperimentReference( concrete_experiment_reference=ConcreteExperimentReference( concrete_experiment_slug=experiment.slug, sequence_parameter_set_slug=sequence.pset.slug, amnes_project_slug=self.__project.slug, ), repetition=repetition, ) self.__controller.current_experiment = ref self.logger.info( f"Initiate execution of '" + f"{sequence.pset.slug}" + f"${experiment.slug}" + f"${repetition}" + "' ..." ) self.__update_experiment_state( sequence, experiment, repetition, ExperimentState.RUNNING ) result = ExperimentExecutionManager( self.__controller.logger.getChild(ExperimentExecutionManager.LOGID), self.__controller, experiment, repetition, ).run() self.__update_experiment_state(sequence, experiment, repetition, result) self.logger.info( f"Execution finished for '" + f"{sequence.pset.slug}" + f"${experiment.slug}" + f"${repetition}" + f"' with result {result}" + "." ) self.__controller.current_experiment = None self.logger.info("All pending experiments were processed.") self.logger.info( f"Execution of AMNES Project '{self.__project.slug}' finished." ) def __update_experiment_state( self, sequence: ExperimentSequence, experiment: ConcreteExperiment, repetition: int, state: ExperimentState, ) -> None: """Update experiment state of specific repetition. Args: sequence (ExperimentSequence): Sequences to which the experiment belongs to. experiment (ConcreteExperiment): Experiment reference to be updated. repetition (int): Repetition whoes state should be updated. state (ExperimentState): New state of the repetition which should be updated. Raises: ValueError: If storage backend is not available. """ experiment.set_state(repetition, state) if self.__controller.storage: self.__controller.storage.update_experiment_states( {repetition: state}.items(), ConcreteExperimentReference( concrete_experiment_slug=experiment.slug, sequence_parameter_set_slug=sequence.pset.slug, amnes_project_slug=self.__project.slug, ), ) else: self.logger.error("No Storage Backend available!") raise ValueError("No Storage Backend available!") def __schedule_experiments(self) -> int: """Set all CREATED experiments of project to PENDING state. Returns: int: Number of experiments set to PENDING state. Raises: ValueError: If storage backend is not available. """ counter: int = 0 for sequence in self.__project.sequences: for experiment in sequence.experiments: for rep_key, rep_value in experiment.states: if rep_value == ExperimentState.CREATED: experiment.set_state(rep_key, ExperimentState.PENDING) counter = counter + 1 if self.__controller.storage: self.__controller.storage.update_experiment_states( experiment.states, ConcreteExperimentReference( concrete_experiment_slug=experiment.slug, sequence_parameter_set_slug=sequence.pset.slug, amnes_project_slug=self.__project.slug, ), ) else: self.logger.error("No Storage Backend available!") raise ValueError("No Storage Backend available!") return counter def __init_managers(self) -> int: """Initialize remote worker manager for every node. Returns: int: Number of remote worker managers initialized. """ counter: int = 0 for node in self.__project.template.nodes: endpoint = node.endpoint manager = Pyro5.Proxy( f"PYRO:{RemoteWorkerManager.PYROID}@{endpoint.address}:{endpoint.port}" ) manager._pyroTimeout = ( # pylint: disable=protected-access ProjectExecutionManager.DEFAULT_TIMEOUT ) self.__workers[node.slug] = (endpoint, manager) counter = counter + 1 return counter def __check_managers(self) -> List[WorkerEndpoint]: """Checks all remote managers for availability. Returns: List[WorkerEndpoint]: List of all WorkerEndpoints which did not respond. """ unavailable: List[WorkerEndpoint] = [] for _, (endpoint, manager) in self.__workers.items(): try: manager.ping() except PyroError: unavailable.append(endpoint) return unavailable def __pending_experiments( self, ) -> Iterator[Tuple[ExperimentSequence, ConcreteExperiment, int]]: """Generator for all pending experiments of AMNES Project. Yields: ExperimentSequence: Sequence of next pending experiment. ConcreteExperiment: Instance of next pending experiment. int: Repetition of next pending experiment. """ done: bool = False while not done: done = True for sequence in self.__project.sequences: for experiment in sequence.experiments: for repetition, state in experiment.states: if state == ExperimentState.PENDING: done = False yield (sequence, experiment, repetition)
Ancestors
Class variables
var DEFAULT_TIMEOUT
var LOGID
Methods
def run(self) -> NoneType
-
Run project execution.
Expand source code
def run(self) -> None: """Run project execution.""" self.logger.info( f"Execution of AMNES Project '{self.__project.slug}' started ..." ) self.logger.info("Moving CREATED experiments to PENDING state ...") self.logger.info( f"{self.__schedule_experiments()} experiments were moved to PENDING state." ) self.logger.info("Initializing remote worker managers for every node ...") self.logger.info( f"{self.__init_managers()} remote worker managers were initialized." ) self.logger.info("Checking for availability of all workers ...") unavailable = self.__check_managers() if unavailable: errstr = "Following workers are not available:\n" for endpoint in unavailable: errstr = errstr + f"~ '{endpoint.address}:{endpoint.port}'\n" self.logger.error(errstr) return self.logger.info("All workers available.") self.logger.info("Starting experiment executions ...") for sequence, experiment, repetition in self.__pending_experiments(): ref = ExperimentReference( concrete_experiment_reference=ConcreteExperimentReference( concrete_experiment_slug=experiment.slug, sequence_parameter_set_slug=sequence.pset.slug, amnes_project_slug=self.__project.slug, ), repetition=repetition, ) self.__controller.current_experiment = ref self.logger.info( f"Initiate execution of '" + f"{sequence.pset.slug}" + f"${experiment.slug}" + f"${repetition}" + "' ..." ) self.__update_experiment_state( sequence, experiment, repetition, ExperimentState.RUNNING ) result = ExperimentExecutionManager( self.__controller.logger.getChild(ExperimentExecutionManager.LOGID), self.__controller, experiment, repetition, ).run() self.__update_experiment_state(sequence, experiment, repetition, result) self.logger.info( f"Execution finished for '" + f"{sequence.pset.slug}" + f"${experiment.slug}" + f"${repetition}" + f"' with result {result}" + "." ) self.__controller.current_experiment = None self.logger.info("All pending experiments were processed.") self.logger.info( f"Execution of AMNES Project '{self.__project.slug}' finished." )
Inherited members