Module amnes.exec.controller.task_execution

This module contains classes and functions for AMNES project execution.

Classes

TaskExecutionManager: Manager class for executing one node task.

Expand source code
"""This module contains classes and functions for AMNES project execution.

Classes:
    TaskExecutionManager: Manager class for executing one node task.
"""

import traceback
from logging import Logger
from typing import TYPE_CHECKING, Dict

import Pyro5.api as Pyro5
from Pyro5.errors import CommunicationError, PyroError

from ... import __version__
from ...core.node_task import NodeTask
from ...core.worker_endpoint import WorkerEndpoint
from ..logging import InstanceLogging
from ..worker.app import RemoteWorkerManager, TaskExecutionResult

if TYPE_CHECKING:
    from .app import Controller  # pylint: disable=cyclic-import


class TaskExecutionManager(InstanceLogging):
    """Manager class for executing one node task."""

    LOGID = "taskexecutionmanager"

    PYRO_CMD_TIMEOUT = 5
    PYRO_TASK_TIMEOUT = 3600

    def __init__(
        self,
        logger: Logger,
        controller: "Controller",
        endpoint: WorkerEndpoint,
        task: NodeTask,
        status: Dict[str, bool],
        identifier: str,
    ) -> None:
        """Experiment execution manager constructor method.

        Args:
            logger (Logger): Logger for object instance.
            controller (Controller): Controller which started the task execution.
            endpoint (WorkerEndpoint): Endpoint of node on which the task should
                                       be executed.
            task (NodeTask): Task which should be executed by the manager.
            status (Dict[str, bool]): Status dictionary for task's stage.
            identifier (str): Thread identifier for this task execution manager.
        """
        InstanceLogging.__init__(self, logger)
        self.__controller: "Controller" = controller
        self.__endpoint: WorkerEndpoint = endpoint
        self.__task: NodeTask = task
        self.__status: Dict[str, bool] = status
        self.__identifier: str = identifier

    def run(self) -> None:
        """Run task execution."""
        self.logger.info(
            "Initializing worker connection to "
            + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
        )
        try:
            manager = self.__init_manager()
        except PyroError:
            self.logger.error(
                f"Could not initialize remote worker manager:\n"
                + f"{traceback.format_exc()}"
            )
        manager._pyroTimeout = (  # type: ignore # pylint: disable=protected-access
            TaskExecutionManager.PYRO_TASK_TIMEOUT
        )
        self.logger.info(
            "Worker connection to "
            + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' initialized."
        )
        self.logger.info(
            f"Executing task '{self.__task.slug}' "
            + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
        )
        result = TaskExecutionResult(  # pylint: disable=no-value-for-parameter
            # type: ignore
            manager.execute_module(
                self.__task.slug,
                self.__task.module,
                self.__task.params,
                self.__task.files,
                self.__controller.configuration.execution.address,
                self.__controller.configuration.execution.port,
            )
        )
        self.logger.info(
            f"Finished task '{self.__task.slug}' "
            + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' "
            + f"with result '{result}'."
        )
        self.__status[self.__identifier] = result is TaskExecutionResult.SUCCESS

    def __init_manager(self) -> RemoteWorkerManager:
        """Initializes Pyro5 proxy for remote worker manager of node.

        Returns:
            RemoteWorkerManager: Pyro5 proxy for remote worker manager of node.

        Raises:
            CommunicationError: If remote worker manager test fails.
        """
        manager = Pyro5.Proxy(
            f"PYRO:{RemoteWorkerManager.PYROID}"
            + f"@{self.__endpoint.address}"
            + f":{self.__endpoint.port}"
        )
        manager._pyroTimeout = (  # pylint: disable=protected-access
            TaskExecutionManager.PYRO_CMD_TIMEOUT
        )

        pingmsg = manager.ping().split("#")
        if len(pingmsg) != 2:
            raise CommunicationError(
                "Could not successfully test remote worker manager, "
                + "received ping message invalid."
            )
        if pingmsg[0] != RemoteWorkerManager.PYROID:
            raise CommunicationError(
                "Could not successfully test remote worker manager, "
                + "identification of remote worker manager failed."
            )
        if pingmsg[1] != __version__:
            raise CommunicationError(
                "Could not successfully test remote worker manager, version mismatch, "
                + f"controller({__version__}) != worker({pingmsg[1]})."
            )

        return manager

Classes

class TaskExecutionManager (logger: logging.Logger, controller: Controller, endpoint: WorkerEndpoint, task: NodeTask, status: Dict[str, bool], identifier: str)

Manager class for executing one node task.

Experiment execution manager constructor method.

Args

logger : Logger
Logger for object instance.
controller : Controller
Controller which started the task execution.
endpoint : WorkerEndpoint
Endpoint of node on which the task should be executed.
task : NodeTask
Task which should be executed by the manager.
status : Dict[str, bool]
Status dictionary for task's stage.
identifier : str
Thread identifier for this task execution manager.
Expand source code
class TaskExecutionManager(InstanceLogging):
    """Manager class for executing one node task."""

    LOGID = "taskexecutionmanager"

    PYRO_CMD_TIMEOUT = 5
    PYRO_TASK_TIMEOUT = 3600

    def __init__(
        self,
        logger: Logger,
        controller: "Controller",
        endpoint: WorkerEndpoint,
        task: NodeTask,
        status: Dict[str, bool],
        identifier: str,
    ) -> None:
        """Experiment execution manager constructor method.

        Args:
            logger (Logger): Logger for object instance.
            controller (Controller): Controller which started the task execution.
            endpoint (WorkerEndpoint): Endpoint of node on which the task should
                                       be executed.
            task (NodeTask): Task which should be executed by the manager.
            status (Dict[str, bool]): Status dictionary for task's stage.
            identifier (str): Thread identifier for this task execution manager.
        """
        InstanceLogging.__init__(self, logger)
        self.__controller: "Controller" = controller
        self.__endpoint: WorkerEndpoint = endpoint
        self.__task: NodeTask = task
        self.__status: Dict[str, bool] = status
        self.__identifier: str = identifier

    def run(self) -> None:
        """Run task execution."""
        self.logger.info(
            "Initializing worker connection to "
            + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
        )
        try:
            manager = self.__init_manager()
        except PyroError:
            self.logger.error(
                f"Could not initialize remote worker manager:\n"
                + f"{traceback.format_exc()}"
            )
        manager._pyroTimeout = (  # type: ignore # pylint: disable=protected-access
            TaskExecutionManager.PYRO_TASK_TIMEOUT
        )
        self.logger.info(
            "Worker connection to "
            + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' initialized."
        )
        self.logger.info(
            f"Executing task '{self.__task.slug}' "
            + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
        )
        result = TaskExecutionResult(  # pylint: disable=no-value-for-parameter
            # type: ignore
            manager.execute_module(
                self.__task.slug,
                self.__task.module,
                self.__task.params,
                self.__task.files,
                self.__controller.configuration.execution.address,
                self.__controller.configuration.execution.port,
            )
        )
        self.logger.info(
            f"Finished task '{self.__task.slug}' "
            + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' "
            + f"with result '{result}'."
        )
        self.__status[self.__identifier] = result is TaskExecutionResult.SUCCESS

    def __init_manager(self) -> RemoteWorkerManager:
        """Initializes Pyro5 proxy for remote worker manager of node.

        Returns:
            RemoteWorkerManager: Pyro5 proxy for remote worker manager of node.

        Raises:
            CommunicationError: If remote worker manager test fails.
        """
        manager = Pyro5.Proxy(
            f"PYRO:{RemoteWorkerManager.PYROID}"
            + f"@{self.__endpoint.address}"
            + f":{self.__endpoint.port}"
        )
        manager._pyroTimeout = (  # pylint: disable=protected-access
            TaskExecutionManager.PYRO_CMD_TIMEOUT
        )

        pingmsg = manager.ping().split("#")
        if len(pingmsg) != 2:
            raise CommunicationError(
                "Could not successfully test remote worker manager, "
                + "received ping message invalid."
            )
        if pingmsg[0] != RemoteWorkerManager.PYROID:
            raise CommunicationError(
                "Could not successfully test remote worker manager, "
                + "identification of remote worker manager failed."
            )
        if pingmsg[1] != __version__:
            raise CommunicationError(
                "Could not successfully test remote worker manager, version mismatch, "
                + f"controller({__version__}) != worker({pingmsg[1]})."
            )

        return manager

Ancestors

Class variables

var LOGID
var PYRO_CMD_TIMEOUT
var PYRO_TASK_TIMEOUT

Methods

def run(self) -> NoneType

Run task execution.

Expand source code
def run(self) -> None:
    """Run task execution."""
    self.logger.info(
        "Initializing worker connection to "
        + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
    )
    try:
        manager = self.__init_manager()
    except PyroError:
        self.logger.error(
            f"Could not initialize remote worker manager:\n"
            + f"{traceback.format_exc()}"
        )
    manager._pyroTimeout = (  # type: ignore # pylint: disable=protected-access
        TaskExecutionManager.PYRO_TASK_TIMEOUT
    )
    self.logger.info(
        "Worker connection to "
        + f"'[{self.__endpoint.address}]:{self.__endpoint.port}' initialized."
    )
    self.logger.info(
        f"Executing task '{self.__task.slug}' "
        + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' ..."
    )
    result = TaskExecutionResult(  # pylint: disable=no-value-for-parameter
        # type: ignore
        manager.execute_module(
            self.__task.slug,
            self.__task.module,
            self.__task.params,
            self.__task.files,
            self.__controller.configuration.execution.address,
            self.__controller.configuration.execution.port,
        )
    )
    self.logger.info(
        f"Finished task '{self.__task.slug}' "
        + f"on worker '[{self.__endpoint.address}]:{self.__endpoint.port}' "
        + f"with result '{result}'."
    )
    self.__status[self.__identifier] = result is TaskExecutionResult.SUCCESS

Inherited members