Module amnes.exec.worker.app
This module contains all classes and functions for the worker app.
Classes
TaskExecutionResult: Task Execution Results. Worker: AMNES Worker application. RemoteWorkerManager: Remote manager for AMNES Worker instance. TaskFunctionProvider: Provider for functions and methods bound to modules for task execution.
Expand source code
"""This module contains all classes and functions for the worker app.
Classes:
TaskExecutionResult: Task Execution Results.
Worker: AMNES Worker application.
RemoteWorkerManager: Remote manager for AMNES Worker instance.
TaskFunctionProvider: Provider for functions and methods bound to modules for
task execution.
"""
from __future__ import annotations
import ipaddress
import os
import shutil
import socket
import sys
import uuid
from contextlib import contextmanager
from enum import Enum
from importlib import import_module
from pathlib import Path
from threading import Lock
from typing import BinaryIO, Iterator, Optional, Type, TypeVar
import Pyro5.api as Pyro5
from Pyro5.errors import PyroError
from ... import __version__
from ...core.node_task import NodeTaskFiles, NodeTaskParams
from ..app import AmnesRemoteException, ExecutionApp
from ..controller.resultmanager import RemoteResultManager
from .config import WorkerConfiguration
from .node_module import NodeModule, NodeModuleError
_Module = TypeVar("_Module", bound=NodeModule)
class TaskExecutionResult(Enum):
"""Task Execution Results.
This enumeration contains all valid results for a task execution.
"""
def __new__(cls, value: int, doc: str) -> TaskExecutionResult:
"""Custom initializer supporting docstrings for enumeration members.
Args:
cls (TaskExecutionResult): TaskExecutionResult class.
value (int): Internal integer value used for state enum member.
doc (str): Docstring for state enum member.
Returns:
TaskExecutionResult: TaskExecutionResult instance.
"""
self = object.__new__(cls)
self._value_ = value
self.__doc__ = doc
return self
SUCCESS = (0, "Task execution was successful.")
PYTHON_MODULE_NOT_FOUND = (
11,
"Could not import python module containing module class.",
)
PYTHON_CLASS_NOT_FOUND = (
12,
"Could not find python class representing node module.",
)
MODULE_NOT_NODEMODULE = (13, "Module is not a subclass of NodeModule.")
WORKDIR_INIT_FAILED = (14, "Working directory initialization failed.")
FUNCTIONPROVIDER_INIT_FAILED = (15, "Function provider initialization failed.")
FAILURE = (255, "Task execution failed.")
def __str__(self) -> str:
"""Get string representation of enum member.
Returns:
str: String representation of enum member.
"""
return super().__str__().rsplit(".", 1)[-1]
class Worker(
ExecutionApp[WorkerConfiguration]
): # pylint: disable=unsubscriptable-object
"""AMNES Worker application.
Attributes:
configuration (WorkerConfiguration): Initialized configuration for
the execution application.
base (str): AMNES base directory for execution application.
"""
APPID = "worker"
def __init__(
self, configuration: WorkerConfiguration, base: str, debug: bool
) -> None:
"""Constructor method for worker application.
Args:
configuration (WorkerConfiguration): Initialized configuration for
the worker application.
base (str): AMNES base directory for worker application.
debug (bool): If debug messages should be logged.
"""
super().__init__(Worker.APPID, configuration, base, debug)
self.__daemon: Optional[Pyro5.Daemon] = None
def logic(self) -> None:
"""AMNES Worker application logic."""
self.logger.info("Initializing Pyro5 endpoint ...")
self.__daemon = Pyro5.Daemon(
host=self.configuration.execution.address,
port=self.configuration.execution.port,
)
self.__daemon.register(RemoteWorkerManager(self), RemoteWorkerManager.PYROID)
self.logger.info("Pyro5 endpoint initialized.")
self.logger.info(
"AMNES Worker started successfully, accepting connections now."
)
self.__daemon.requestLoop()
def shutdown(self) -> None:
"""AMNES Worker application shutdown handler."""
self.logger.info("Received shutdown signal for AMNES Worker, shutting down ...")
if self.__daemon:
self.__daemon.shutdown()
self.logger.info("AMNES Worker stopped gracefully, exiting now.")
sys.exit(0)
def __module_execute(self, execid: str, module_instance: NodeModule) -> bool:
"""Run `execute` function for given NodeModule instance.
Args:
execid (str): Module execution ID used by logging.
module_instance (NodeModule): NodeModule instance for which `execute`
should be called.
Returns:
bool: True, if the `execute` function call was successful,
else False.
"""
success = True
try:
module_instance.execute()
except Exception as exc: # pylint: disable=broad-except
success = False
if isinstance(exc, NodeModuleError):
module_instance.error = exc
module_instance.corrupt = False
self.logger.warning(
f"({execid}) Module error occured while running 'execute()': {exc}"
)
else:
module_instance.error = None
module_instance.corrupt = True
self.logger.error(
f"({execid}) "
+ f"Uncatched error occured while running 'execute()': {exc}"
)
else:
module_instance.error = None
module_instance.corrupt = False
return success
def __module_collect(self, execid: str, module_instance: NodeModule) -> bool:
"""Run `collect` function for given NodeModule instance.
Args:
execid (str): Module execution ID used by logging.
module_instance (NodeModule): NodeModule instance for which `collect`
should be called.
Returns:
bool: True, if the `collect` function call was successful,
else False.
"""
success = True
try:
module_instance.collect()
except Exception as exc: # pylint: disable=broad-except
success = False
if isinstance(exc, NodeModuleError):
module_instance.error = exc
module_instance.corrupt = False
self.logger.warning(
f"({execid}) Module error occured while running 'execute()': {exc}"
)
else:
module_instance.error = None
module_instance.corrupt = True
self.logger.error(
f"({execid}) "
+ f"Uncatched error occured while running 'execute()': {exc}"
)
else:
module_instance.error = None
module_instance.corrupt = False
return success
def __module_cleanup(self, execid: str, module_instance: NodeModule) -> bool:
"""Run `cleanup` function for given NodeModule instance.
Args:
execid (str): Module execution ID used by logging.
module_instance (NodeModule): NodeModule instance for which `cleanup`
should be called.
Returns:
bool: True, if the `cleanup` function call was successful,
else False.
"""
success = True
try:
module_instance.cleanup()
except Exception as exc: # pylint: disable=broad-except
success = False
if isinstance(exc, NodeModuleError):
module_instance.error = exc
module_instance.corrupt = False
self.logger.warning(
f"({execid}) Module error occured while running 'execute()': {exc}"
)
else:
module_instance.error = None
module_instance.corrupt = True
self.logger.error(
f"({execid}) "
+ f"Uncatched error occured while running 'execute()': {exc}"
)
return success
def __module_init(
self,
execid: str,
modulecls: Type[_Module],
params: NodeTaskParams,
files: NodeTaskFiles,
workdir: str,
) -> Optional[NodeModule]:
"""Initializes an instance of the given module class.
Args:
execid (str): Module execution ID used by logging.
modulecls (Type): Class from which an instance should be created.
params (NodeTaskParams): Parameters used for module execution.
files (NodeTaskFiles): Files used for module execution.
workdir (str): Working directory for the node module instance.
Returns:
Optional[NodeModule]: Node modules instances created from the given module
class or None if initialization failed.
"""
try:
return modulecls(params, files, workdir)
except Exception as exc: # pylint: disable=broad-except
if isinstance(exc, NodeModuleError):
self.logger.warning(
f"({execid}) Module error occured during instantiation: {exc}"
)
else:
self.logger.error(
f"({execid}) "
+ f"Uncatched error occured while during instantiation: {exc}"
)
return None
def execute_module( # pylint: disable=too-many-locals,too-many-return-statements
self,
execid: str,
module: str,
params: NodeTaskParams,
files: NodeTaskFiles,
controller_address: str,
controller_port: int,
) -> TaskExecutionResult:
"""Executes module with given parameters and files.
Args:
execid (str): Module execution ID used by logging.
module (str): Module which should be executed.
params (NodeTaskParams): Parameters used for module execution.
files (NodeTaskFiles): Files used for module execution.
controller_address (str): Address of controller pyro5 endpoint.
controller_port (int): Port of controller pyro5 endpoint.
Returns:
TaskExecutionResult: Task execution result after execution.
"""
self.logger.info(f"Loading module '{module}' ...")
(module_path, module_class) = tuple(module.rsplit(".", 1))
try:
modulecls: Type = getattr(import_module(module_path), module_class)
except ModuleNotFoundError as merr:
self.logger.error(f"Could not import python module '{module_path}': {merr}")
return TaskExecutionResult.PYTHON_MODULE_NOT_FOUND
except AttributeError as aerr:
self.logger.error(
"Could not import python class "
+ f"'{module_class}' from '{module_path}': {aerr}"
)
return TaskExecutionResult.PYTHON_CLASS_NOT_FOUND
if not issubclass(modulecls, NodeModule):
self.logger.error(f"Specified module '{module}' is not a NodeModule!")
return TaskExecutionResult.MODULE_NOT_NODEMODULE
self.logger.info(f"Successfully loaded node module '{module}'.")
success: bool = True
workdir = f"{self.base}{os.path.sep}worker{os.path.sep}{uuid.uuid4()}"
self.logger.info(f"Preparing working directory '{workdir}' ...")
try:
Path(workdir).mkdir(parents=True)
except OSError as oserr:
self.logger.error(f"Could not prepare working directory: {oserr}")
return TaskExecutionResult.WORKDIR_INIT_FAILED
self.logger.info("Working directory prepared successfully.")
self.logger.info("Initializing function provider ...")
try:
function_provider = TaskFunctionProvider(
controller_address, controller_port
)
except PyroError as perr:
self.logger.error(f"Function provider initialization failed: {perr}")
return TaskExecutionResult.FUNCTIONPROVIDER_INIT_FAILED
self.logger.info("Function provider initialized.")
self.logger.info("Creating module instance ...")
module_instance = self.__module_init(execid, modulecls, params, files, workdir)
if module_instance is None:
return TaskExecutionResult.FAILURE
module_instance.store_io = function_provider.store_io # type: ignore
self.logger.info("Module instance created.")
self.logger.info("Start execution of node module stages ...")
success = self.__module_execute(execid, module_instance)
success = self.__module_collect(execid, module_instance) & success
success = self.__module_cleanup(execid, module_instance) & success
result = TaskExecutionResult.SUCCESS if success else TaskExecutionResult.FAILURE
self.logger.info(f"Execution finished with result '{result}'.")
shutil.rmtree(workdir, ignore_errors=True)
return result
class RemoteWorkerManager:
"""Remote manager for AMNES Worker instance."""
PYROID = "rmtworkermngr"
def __init__(self, worker: Worker) -> None:
"""Remote worker manager constructor method.
Args:
worker (Worker): Linked worker which should be managed.
"""
self.__worker = worker
self.__lock = Lock()
@contextmanager
def __exclusive(self) -> Iterator[None]:
"""Contextmanager for exclusive operation execution in remote manager.
Yields:
None: No yield value available.
Raises:
ConnectionError: If exclusive operation execution is blocked.
"""
if self.__lock.acquire(blocking=True, timeout=2):
try:
yield
finally:
self.__lock.release()
else:
raise ConnectionError("Remote Manager currently blocked, try again later.")
@staticmethod
@Pyro5.expose # type: ignore
def ping() -> str:
"""Returns remote worker manager ping message.
Returns:
str: Remote worker manager ping message.
"""
return f"{RemoteWorkerManager.PYROID}#{__version__}"
@Pyro5.expose # type: ignore
def execute_module(
self,
execid: str,
module: str,
params: NodeTaskParams,
files: NodeTaskFiles,
controller_address: str,
controller_port: int,
) -> TaskExecutionResult:
"""Executes module with given parameters and files.
Args:
execid (str): Module execution ID used by logging.
module (str): Module which should be executed.
params (NodeTaskParams): Parameters used for module execution.
files (NodeTaskFiles): Files used for module execution.
controller_address (str): Address of controller pyro5 endpoint.
controller_port (int): Port of controller pyro5 endpoint.
Returns:
TaskExecutionResult: Task execution result after execution.
"""
with AmnesRemoteException.exception_handling():
return self.__worker.execute_module(
execid, module, params, files, controller_address, controller_port
)
class TaskFunctionProvider: # pylint: disable=too-few-public-methods
"""Provider for functions and methods bound to modules for task execution."""
def __init__(self, controller_address: str, controller_port: int) -> None:
"""Task function provider constructor method.
Args:
controller_address (str): Address of controller pyro5 endpoint.
controller_port (int): Port of controller pyro5 endpoint.
"""
rmngr: RemoteResultManager = Pyro5.Proxy(
f"PYRO:{RemoteResultManager.PYROID}@{controller_address}:{controller_port}"
)
rmngr._pyroTimeout = 10 # type: ignore
rmngr.ping()
self.__resultmanager = rmngr
def store_io(self, stream: BinaryIO) -> None:
"""Method for storing content of an IO stream into a persistent object.
This method can be bound to a node module instance.
Args:
stream (BinaryIO): IO stream which should be stored persistently.
"""
token, (address, port) = self.__resultmanager.request_filetransfer()
if isinstance(ipaddress.ip_address(address), ipaddress.IPv6Address):
family = socket.AF_INET6
else:
family = socket.AF_INET
with socket.socket(family, socket.SOCK_STREAM) as sock:
sock.connect((address, port))
sock.sendall(token.encode())
sock.sendfile(stream)
Classes
class RemoteWorkerManager (worker: Worker)
-
Remote manager for AMNES Worker instance.
Remote worker manager constructor method.
Args
worker
:Worker
- Linked worker which should be managed.
Expand source code
class RemoteWorkerManager: """Remote manager for AMNES Worker instance.""" PYROID = "rmtworkermngr" def __init__(self, worker: Worker) -> None: """Remote worker manager constructor method. Args: worker (Worker): Linked worker which should be managed. """ self.__worker = worker self.__lock = Lock() @contextmanager def __exclusive(self) -> Iterator[None]: """Contextmanager for exclusive operation execution in remote manager. Yields: None: No yield value available. Raises: ConnectionError: If exclusive operation execution is blocked. """ if self.__lock.acquire(blocking=True, timeout=2): try: yield finally: self.__lock.release() else: raise ConnectionError("Remote Manager currently blocked, try again later.") @staticmethod @Pyro5.expose # type: ignore def ping() -> str: """Returns remote worker manager ping message. Returns: str: Remote worker manager ping message. """ return f"{RemoteWorkerManager.PYROID}#{__version__}" @Pyro5.expose # type: ignore def execute_module( self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int, ) -> TaskExecutionResult: """Executes module with given parameters and files. Args: execid (str): Module execution ID used by logging. module (str): Module which should be executed. params (NodeTaskParams): Parameters used for module execution. files (NodeTaskFiles): Files used for module execution. controller_address (str): Address of controller pyro5 endpoint. controller_port (int): Port of controller pyro5 endpoint. Returns: TaskExecutionResult: Task execution result after execution. """ with AmnesRemoteException.exception_handling(): return self.__worker.execute_module( execid, module, params, files, controller_address, controller_port )
Class variables
var PYROID
Static methods
def ping() -> str
-
Returns remote worker manager ping message.
Returns
str
- Remote worker manager ping message.
Expand source code
@staticmethod @Pyro5.expose # type: ignore def ping() -> str: """Returns remote worker manager ping message. Returns: str: Remote worker manager ping message. """ return f"{RemoteWorkerManager.PYROID}#{__version__}"
Methods
def execute_module(self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int) -> TaskExecutionResult
-
Executes module with given parameters and files.
Args
execid
:str
- Module execution ID used by logging.
module
:str
- Module which should be executed.
params
:NodeTaskParams
- Parameters used for module execution.
files
:NodeTaskFiles
- Files used for module execution.
controller_address
:str
- Address of controller pyro5 endpoint.
controller_port
:int
- Port of controller pyro5 endpoint.
Returns
TaskExecutionResult
- Task execution result after execution.
Expand source code
@Pyro5.expose # type: ignore def execute_module( self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int, ) -> TaskExecutionResult: """Executes module with given parameters and files. Args: execid (str): Module execution ID used by logging. module (str): Module which should be executed. params (NodeTaskParams): Parameters used for module execution. files (NodeTaskFiles): Files used for module execution. controller_address (str): Address of controller pyro5 endpoint. controller_port (int): Port of controller pyro5 endpoint. Returns: TaskExecutionResult: Task execution result after execution. """ with AmnesRemoteException.exception_handling(): return self.__worker.execute_module( execid, module, params, files, controller_address, controller_port )
class TaskExecutionResult (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Task Execution Results.
This enumeration contains all valid results for a task execution.
Expand source code
class TaskExecutionResult(Enum): """Task Execution Results. This enumeration contains all valid results for a task execution. """ def __new__(cls, value: int, doc: str) -> TaskExecutionResult: """Custom initializer supporting docstrings for enumeration members. Args: cls (TaskExecutionResult): TaskExecutionResult class. value (int): Internal integer value used for state enum member. doc (str): Docstring for state enum member. Returns: TaskExecutionResult: TaskExecutionResult instance. """ self = object.__new__(cls) self._value_ = value self.__doc__ = doc return self SUCCESS = (0, "Task execution was successful.") PYTHON_MODULE_NOT_FOUND = ( 11, "Could not import python module containing module class.", ) PYTHON_CLASS_NOT_FOUND = ( 12, "Could not find python class representing node module.", ) MODULE_NOT_NODEMODULE = (13, "Module is not a subclass of NodeModule.") WORKDIR_INIT_FAILED = (14, "Working directory initialization failed.") FUNCTIONPROVIDER_INIT_FAILED = (15, "Function provider initialization failed.") FAILURE = (255, "Task execution failed.") def __str__(self) -> str: """Get string representation of enum member. Returns: str: String representation of enum member. """ return super().__str__().rsplit(".", 1)[-1]
Ancestors
- enum.Enum
Class variables
var FAILURE
var FUNCTIONPROVIDER_INIT_FAILED
var MODULE_NOT_NODEMODULE
var PYTHON_CLASS_NOT_FOUND
var PYTHON_MODULE_NOT_FOUND
var SUCCESS
var WORKDIR_INIT_FAILED
class TaskFunctionProvider (controller_address: str, controller_port: int)
-
Provider for functions and methods bound to modules for task execution.
Task function provider constructor method.
Args
controller_address
:str
- Address of controller pyro5 endpoint.
controller_port
:int
- Port of controller pyro5 endpoint.
Expand source code
class TaskFunctionProvider: # pylint: disable=too-few-public-methods """Provider for functions and methods bound to modules for task execution.""" def __init__(self, controller_address: str, controller_port: int) -> None: """Task function provider constructor method. Args: controller_address (str): Address of controller pyro5 endpoint. controller_port (int): Port of controller pyro5 endpoint. """ rmngr: RemoteResultManager = Pyro5.Proxy( f"PYRO:{RemoteResultManager.PYROID}@{controller_address}:{controller_port}" ) rmngr._pyroTimeout = 10 # type: ignore rmngr.ping() self.__resultmanager = rmngr def store_io(self, stream: BinaryIO) -> None: """Method for storing content of an IO stream into a persistent object. This method can be bound to a node module instance. Args: stream (BinaryIO): IO stream which should be stored persistently. """ token, (address, port) = self.__resultmanager.request_filetransfer() if isinstance(ipaddress.ip_address(address), ipaddress.IPv6Address): family = socket.AF_INET6 else: family = socket.AF_INET with socket.socket(family, socket.SOCK_STREAM) as sock: sock.connect((address, port)) sock.sendall(token.encode()) sock.sendfile(stream)
Methods
def store_io(self, stream: BinaryIO) -> NoneType
-
Method for storing content of an IO stream into a persistent object.
This method can be bound to a node module instance.
Args
stream
:BinaryIO
- IO stream which should be stored persistently.
Expand source code
def store_io(self, stream: BinaryIO) -> None: """Method for storing content of an IO stream into a persistent object. This method can be bound to a node module instance. Args: stream (BinaryIO): IO stream which should be stored persistently. """ token, (address, port) = self.__resultmanager.request_filetransfer() if isinstance(ipaddress.ip_address(address), ipaddress.IPv6Address): family = socket.AF_INET6 else: family = socket.AF_INET with socket.socket(family, socket.SOCK_STREAM) as sock: sock.connect((address, port)) sock.sendall(token.encode()) sock.sendfile(stream)
class Worker (*args, **kwds)
-
AMNES Worker application.
Attributes
configuration
:WorkerConfiguration
- Initialized configuration for the execution application.
base
:str
- AMNES base directory for execution application.
Constructor method for worker application.
Args
configuration
:WorkerConfiguration
- Initialized configuration for the worker application.
base
:str
- AMNES base directory for worker application.
debug
:bool
- If debug messages should be logged.
Expand source code
class Worker( ExecutionApp[WorkerConfiguration] ): # pylint: disable=unsubscriptable-object """AMNES Worker application. Attributes: configuration (WorkerConfiguration): Initialized configuration for the execution application. base (str): AMNES base directory for execution application. """ APPID = "worker" def __init__( self, configuration: WorkerConfiguration, base: str, debug: bool ) -> None: """Constructor method for worker application. Args: configuration (WorkerConfiguration): Initialized configuration for the worker application. base (str): AMNES base directory for worker application. debug (bool): If debug messages should be logged. """ super().__init__(Worker.APPID, configuration, base, debug) self.__daemon: Optional[Pyro5.Daemon] = None def logic(self) -> None: """AMNES Worker application logic.""" self.logger.info("Initializing Pyro5 endpoint ...") self.__daemon = Pyro5.Daemon( host=self.configuration.execution.address, port=self.configuration.execution.port, ) self.__daemon.register(RemoteWorkerManager(self), RemoteWorkerManager.PYROID) self.logger.info("Pyro5 endpoint initialized.") self.logger.info( "AMNES Worker started successfully, accepting connections now." ) self.__daemon.requestLoop() def shutdown(self) -> None: """AMNES Worker application shutdown handler.""" self.logger.info("Received shutdown signal for AMNES Worker, shutting down ...") if self.__daemon: self.__daemon.shutdown() self.logger.info("AMNES Worker stopped gracefully, exiting now.") sys.exit(0) def __module_execute(self, execid: str, module_instance: NodeModule) -> bool: """Run `execute` function for given NodeModule instance. Args: execid (str): Module execution ID used by logging. module_instance (NodeModule): NodeModule instance for which `execute` should be called. Returns: bool: True, if the `execute` function call was successful, else False. """ success = True try: module_instance.execute() except Exception as exc: # pylint: disable=broad-except success = False if isinstance(exc, NodeModuleError): module_instance.error = exc module_instance.corrupt = False self.logger.warning( f"({execid}) Module error occured while running 'execute()': {exc}" ) else: module_instance.error = None module_instance.corrupt = True self.logger.error( f"({execid}) " + f"Uncatched error occured while running 'execute()': {exc}" ) else: module_instance.error = None module_instance.corrupt = False return success def __module_collect(self, execid: str, module_instance: NodeModule) -> bool: """Run `collect` function for given NodeModule instance. Args: execid (str): Module execution ID used by logging. module_instance (NodeModule): NodeModule instance for which `collect` should be called. Returns: bool: True, if the `collect` function call was successful, else False. """ success = True try: module_instance.collect() except Exception as exc: # pylint: disable=broad-except success = False if isinstance(exc, NodeModuleError): module_instance.error = exc module_instance.corrupt = False self.logger.warning( f"({execid}) Module error occured while running 'execute()': {exc}" ) else: module_instance.error = None module_instance.corrupt = True self.logger.error( f"({execid}) " + f"Uncatched error occured while running 'execute()': {exc}" ) else: module_instance.error = None module_instance.corrupt = False return success def __module_cleanup(self, execid: str, module_instance: NodeModule) -> bool: """Run `cleanup` function for given NodeModule instance. Args: execid (str): Module execution ID used by logging. module_instance (NodeModule): NodeModule instance for which `cleanup` should be called. Returns: bool: True, if the `cleanup` function call was successful, else False. """ success = True try: module_instance.cleanup() except Exception as exc: # pylint: disable=broad-except success = False if isinstance(exc, NodeModuleError): module_instance.error = exc module_instance.corrupt = False self.logger.warning( f"({execid}) Module error occured while running 'execute()': {exc}" ) else: module_instance.error = None module_instance.corrupt = True self.logger.error( f"({execid}) " + f"Uncatched error occured while running 'execute()': {exc}" ) return success def __module_init( self, execid: str, modulecls: Type[_Module], params: NodeTaskParams, files: NodeTaskFiles, workdir: str, ) -> Optional[NodeModule]: """Initializes an instance of the given module class. Args: execid (str): Module execution ID used by logging. modulecls (Type): Class from which an instance should be created. params (NodeTaskParams): Parameters used for module execution. files (NodeTaskFiles): Files used for module execution. workdir (str): Working directory for the node module instance. Returns: Optional[NodeModule]: Node modules instances created from the given module class or None if initialization failed. """ try: return modulecls(params, files, workdir) except Exception as exc: # pylint: disable=broad-except if isinstance(exc, NodeModuleError): self.logger.warning( f"({execid}) Module error occured during instantiation: {exc}" ) else: self.logger.error( f"({execid}) " + f"Uncatched error occured while during instantiation: {exc}" ) return None def execute_module( # pylint: disable=too-many-locals,too-many-return-statements self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int, ) -> TaskExecutionResult: """Executes module with given parameters and files. Args: execid (str): Module execution ID used by logging. module (str): Module which should be executed. params (NodeTaskParams): Parameters used for module execution. files (NodeTaskFiles): Files used for module execution. controller_address (str): Address of controller pyro5 endpoint. controller_port (int): Port of controller pyro5 endpoint. Returns: TaskExecutionResult: Task execution result after execution. """ self.logger.info(f"Loading module '{module}' ...") (module_path, module_class) = tuple(module.rsplit(".", 1)) try: modulecls: Type = getattr(import_module(module_path), module_class) except ModuleNotFoundError as merr: self.logger.error(f"Could not import python module '{module_path}': {merr}") return TaskExecutionResult.PYTHON_MODULE_NOT_FOUND except AttributeError as aerr: self.logger.error( "Could not import python class " + f"'{module_class}' from '{module_path}': {aerr}" ) return TaskExecutionResult.PYTHON_CLASS_NOT_FOUND if not issubclass(modulecls, NodeModule): self.logger.error(f"Specified module '{module}' is not a NodeModule!") return TaskExecutionResult.MODULE_NOT_NODEMODULE self.logger.info(f"Successfully loaded node module '{module}'.") success: bool = True workdir = f"{self.base}{os.path.sep}worker{os.path.sep}{uuid.uuid4()}" self.logger.info(f"Preparing working directory '{workdir}' ...") try: Path(workdir).mkdir(parents=True) except OSError as oserr: self.logger.error(f"Could not prepare working directory: {oserr}") return TaskExecutionResult.WORKDIR_INIT_FAILED self.logger.info("Working directory prepared successfully.") self.logger.info("Initializing function provider ...") try: function_provider = TaskFunctionProvider( controller_address, controller_port ) except PyroError as perr: self.logger.error(f"Function provider initialization failed: {perr}") return TaskExecutionResult.FUNCTIONPROVIDER_INIT_FAILED self.logger.info("Function provider initialized.") self.logger.info("Creating module instance ...") module_instance = self.__module_init(execid, modulecls, params, files, workdir) if module_instance is None: return TaskExecutionResult.FAILURE module_instance.store_io = function_provider.store_io # type: ignore self.logger.info("Module instance created.") self.logger.info("Start execution of node module stages ...") success = self.__module_execute(execid, module_instance) success = self.__module_collect(execid, module_instance) & success success = self.__module_cleanup(execid, module_instance) & success result = TaskExecutionResult.SUCCESS if success else TaskExecutionResult.FAILURE self.logger.info(f"Execution finished with result '{result}'.") shutil.rmtree(workdir, ignore_errors=True) return result
Ancestors
- ExecutionApp
- typing.Generic
- InstanceLogging
Class variables
var APPID
Methods
def execute_module(self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int) -> TaskExecutionResult
-
Executes module with given parameters and files.
Args
execid
:str
- Module execution ID used by logging.
module
:str
- Module which should be executed.
params
:NodeTaskParams
- Parameters used for module execution.
files
:NodeTaskFiles
- Files used for module execution.
controller_address
:str
- Address of controller pyro5 endpoint.
controller_port
:int
- Port of controller pyro5 endpoint.
Returns
TaskExecutionResult
- Task execution result after execution.
Expand source code
def execute_module( # pylint: disable=too-many-locals,too-many-return-statements self, execid: str, module: str, params: NodeTaskParams, files: NodeTaskFiles, controller_address: str, controller_port: int, ) -> TaskExecutionResult: """Executes module with given parameters and files. Args: execid (str): Module execution ID used by logging. module (str): Module which should be executed. params (NodeTaskParams): Parameters used for module execution. files (NodeTaskFiles): Files used for module execution. controller_address (str): Address of controller pyro5 endpoint. controller_port (int): Port of controller pyro5 endpoint. Returns: TaskExecutionResult: Task execution result after execution. """ self.logger.info(f"Loading module '{module}' ...") (module_path, module_class) = tuple(module.rsplit(".", 1)) try: modulecls: Type = getattr(import_module(module_path), module_class) except ModuleNotFoundError as merr: self.logger.error(f"Could not import python module '{module_path}': {merr}") return TaskExecutionResult.PYTHON_MODULE_NOT_FOUND except AttributeError as aerr: self.logger.error( "Could not import python class " + f"'{module_class}' from '{module_path}': {aerr}" ) return TaskExecutionResult.PYTHON_CLASS_NOT_FOUND if not issubclass(modulecls, NodeModule): self.logger.error(f"Specified module '{module}' is not a NodeModule!") return TaskExecutionResult.MODULE_NOT_NODEMODULE self.logger.info(f"Successfully loaded node module '{module}'.") success: bool = True workdir = f"{self.base}{os.path.sep}worker{os.path.sep}{uuid.uuid4()}" self.logger.info(f"Preparing working directory '{workdir}' ...") try: Path(workdir).mkdir(parents=True) except OSError as oserr: self.logger.error(f"Could not prepare working directory: {oserr}") return TaskExecutionResult.WORKDIR_INIT_FAILED self.logger.info("Working directory prepared successfully.") self.logger.info("Initializing function provider ...") try: function_provider = TaskFunctionProvider( controller_address, controller_port ) except PyroError as perr: self.logger.error(f"Function provider initialization failed: {perr}") return TaskExecutionResult.FUNCTIONPROVIDER_INIT_FAILED self.logger.info("Function provider initialized.") self.logger.info("Creating module instance ...") module_instance = self.__module_init(execid, modulecls, params, files, workdir) if module_instance is None: return TaskExecutionResult.FAILURE module_instance.store_io = function_provider.store_io # type: ignore self.logger.info("Module instance created.") self.logger.info("Start execution of node module stages ...") success = self.__module_execute(execid, module_instance) success = self.__module_collect(execid, module_instance) & success success = self.__module_cleanup(execid, module_instance) & success result = TaskExecutionResult.SUCCESS if success else TaskExecutionResult.FAILURE self.logger.info(f"Execution finished with result '{result}'.") shutil.rmtree(workdir, ignore_errors=True) return result
def logic(self) -> NoneType
-
AMNES Worker application logic.
Expand source code
def logic(self) -> None: """AMNES Worker application logic.""" self.logger.info("Initializing Pyro5 endpoint ...") self.__daemon = Pyro5.Daemon( host=self.configuration.execution.address, port=self.configuration.execution.port, ) self.__daemon.register(RemoteWorkerManager(self), RemoteWorkerManager.PYROID) self.logger.info("Pyro5 endpoint initialized.") self.logger.info( "AMNES Worker started successfully, accepting connections now." ) self.__daemon.requestLoop()
def shutdown(self) -> NoneType
-
AMNES Worker application shutdown handler.
Expand source code
def shutdown(self) -> None: """AMNES Worker application shutdown handler.""" self.logger.info("Received shutdown signal for AMNES Worker, shutting down ...") if self.__daemon: self.__daemon.shutdown() self.logger.info("AMNES Worker stopped gracefully, exiting now.") sys.exit(0)
Inherited members