Module amnes.exec.app
This module contains all classes and functions for a basic execution app.
Classes
AmnesRemoteException: Exception raised by AMNES Components to pass errors to remote destinations. ExecutionApp: Abstract generic execution application used as base for AMNES components.
Expand source code
"""This module contains all classes and functions for a basic execution app.
Classes:
AmnesRemoteException: Exception raised by AMNES Components to pass errors
to remote destinations.
ExecutionApp: Abstract generic execution application used as base for
AMNES components.
"""
from __future__ import annotations
import logging
import signal
import sys
import threading
import traceback as tb
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from logging import Logger
from threading import Lock
from types import FrameType, TracebackType
from typing import Dict, Generic, Iterator, Optional, Type, TypeVar
from Pyro5.errors import get_pyro_traceback
from Pyro5.serializers import SerializerBase
from ..core.node_task import NodeTaskFiles, NodeTaskParams
from ..exec.config import ExecutionConfiguration
from ..exec.logging import AmnesLogger, InstanceLogging, create_handlers
from . import patcher
_Config = TypeVar("_Config", bound=ExecutionConfiguration)
class AmnesRemoteException(Exception):
"""Exception raised by AMNES Components to pass errors to remote destinations."""
LOGGER: Optional[Logger] = None
@staticmethod
def deserialize(
classname: str, indict: Dict # pylint: disable=unused-argument
) -> AmnesRemoteException:
"""Deserialize AmnesRemoteException dictionary from Pyro communication.
Args:
classname (str): Name of class which was serialized.
indict (Dict): Serialized representation of instance as dictionary.
Returns:
AmnesRemoteException: Instance deserialized from dictionary representation.
"""
exc = AmnesRemoteException()
# pylint: disable=protected-access
exc._pyroTraceback = indict["attributes"]["_pyroTraceback"] # type: ignore
return exc
@staticmethod
@contextmanager
def exception_handling() -> Iterator[None]:
"""Contextmanager for exception handling of worker remote exceptions.
Yields:
None: No yield value available.
Raises:
AmnesRemoteException: If any exception was raised in context.
"""
try:
yield
except Exception as exc:
type_, value, traceback = sys.exc_info()
AmnesRemoteException.exception_hook_sys(type_, value, traceback)
raise AmnesRemoteException() from exc
@staticmethod
def exception_hook_sys(
type_: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Python exception hook for default system.
Args:
type_ (Type[BaseException]): Type of raised exception.
value (BaseException): Raised exception.
traceback (TracebackType): Traceback of raised exception.
"""
if hasattr(value, "_pyroTraceback"):
AmnesRemoteException.__log(
logging.ERROR, "".join(get_pyro_traceback(type_, value, traceback))
)
else:
AmnesRemoteException.__log(
logging.ERROR, "".join(tb.format_exception(type_, value, traceback))
)
@staticmethod
def exception_hook_threads(args) -> None: # type: ignore
"""Python exception hook for threads.
Args:
args (ExceptionHookArgs): Exception hook arguments.
"""
AmnesRemoteException.exception_hook_sys(
args.exc_type, args.exc_value, args.exc_traceback
)
@staticmethod
def __log(level: int, msg: str) -> None:
"""Logging function with STDERR fallback.
Args:
level (int): Log level used for logging.
msg (str): Message which should be logged.
"""
if AmnesRemoteException.LOGGER is None:
print(
"Logger for exceptions not configured, using fallback!", file=sys.stderr
)
print(msg, file=sys.stderr)
else:
AmnesRemoteException.LOGGER.log(level, msg)
class ExecutionApp(Generic[_Config], InstanceLogging, metaclass=ABCMeta):
"""Abstract generic execution application used as base for AMNES components.
Every AMNES component which should be executed as a service must inherit
from this class.
Generic Types:
_Config: Subclass of `ExecutionConfiguration`.
Attributes:
configuration (_Config): Initialized configuration for the execution
application.
base (str): AMNES base directory for execution application.
"""
def __init__(self, appid: str, configuration: _Config, base: str, debug: bool):
"""Generic constructor method for execution application.
Args:
appid (str): Application ID used for logging.
configuration (_Config): Initialized configuration for the execution
application.
base (str): AMNES base directory for execution application.
debug (bool): If debug messages should be logged.
"""
threading.current_thread().name = "App"
patcher.patch()
self.__set_configuration(configuration)
self.__set_base(base)
InstanceLogging.__init__(self, self.__configure_logging(appid, debug))
self.__shutdown_lock = Lock()
self.__register_serializer()
def __configure_logging(self, appid: str, debug: bool) -> Logger:
"""Configure logging for execution application.
Args:
appid (str): Application ID used for logging.
debug (bool): If debug messages should be logged.
Returns:
Logger: Logger instance for InstanceLogging constructor.
"""
app_logger = AmnesLogger().logger.getChild(appid)
app_logger.setLevel(logging.DEBUG if debug else logging.INFO)
for handler in create_handlers(self.configuration.execution.logging):
app_logger.addHandler(handler)
AmnesRemoteException.LOGGER = app_logger
if app_logger.level == logging.DEBUG:
app_logger.debug("Debug Level for Logging enabled.")
return app_logger
def execute(self) -> None:
"""Execution method which is called to start the application.
Before calling `self.logic()`, a signal hander is registered
to catch `SIGINT` and call `self.shutdown()` if it is catched.
"""
def signalhandler(
sig: int, frame: FrameType # pylint: disable=unused-argument
) -> None:
if self.__shutdown_lock.acquire():
self.shutdown()
sys.exit(0)
# Only one SIGINT handler should run.
# This call will end with an exit, so lock is not released.
signal.signal(signal.SIGINT, signalhandler)
sys.excepthook = AmnesRemoteException.exception_hook_sys
threading.excepthook = AmnesRemoteException.exception_hook_threads
self.logic()
@abstractmethod
def logic(self) -> None:
"""Abstract method running the main logic of the execution application.
This method must be implemented by all subclasses of `ExecutionApp` and
is called by `self.execute()` on application execution.
"""
@abstractmethod
def shutdown(self) -> None:
"""Abstract method for shutting down the execution application.
This method must be implemented by all subclasses of `ExecutionApp` and
is called on `SIGINT` to shutdown the application.
If no `sys.exit()` is called in the implementation of this method,
the application will exit with exit code 0.
"""
@property
def configuration(self) -> _Config:
"""_Config: Initialized configuration for the execution application.
Returns:
_Config: Initialized configuration for the execution application.
"""
return self.__configuration
def __set_configuration(self, configuration: _Config) -> None:
"""Set configuration for the execution application.
Args:
configuration (_Config): Initialized configuration for the execution
application.
"""
self.__configuration: _Config = configuration
@property
def base(self) -> str:
"""str: AMNES base directory for execution application.
Returns:
str: AMNES base directory for execution application.
"""
return self.__base
def __set_base(self, base: str) -> None:
"""Set AMNES base directory for execution application.
Args:
base (str): AMNES base directory for execution application.
"""
self.__base: str = base
@staticmethod
def __register_serializer() -> None:
"""Register serializers and deserializers for Pyro communication."""
SerializerBase.register_dict_to_class(
f"{AmnesRemoteException.__module__}.{AmnesRemoteException.__name__}",
AmnesRemoteException.deserialize,
)
SerializerBase.register_class_to_dict(NodeTaskParams, NodeTaskParams.serialize)
SerializerBase.register_dict_to_class(
f"{NodeTaskParams.__module__}.{NodeTaskParams.__name__}",
NodeTaskParams.deserialize,
)
SerializerBase.register_class_to_dict(NodeTaskFiles, NodeTaskFiles.serialize)
SerializerBase.register_dict_to_class(
f"{NodeTaskFiles.__module__}.{NodeTaskFiles.__name__}",
NodeTaskFiles.deserialize,
)
Classes
class AmnesRemoteException (...)
-
Exception raised by AMNES Components to pass errors to remote destinations.
Expand source code
class AmnesRemoteException(Exception): """Exception raised by AMNES Components to pass errors to remote destinations.""" LOGGER: Optional[Logger] = None @staticmethod def deserialize( classname: str, indict: Dict # pylint: disable=unused-argument ) -> AmnesRemoteException: """Deserialize AmnesRemoteException dictionary from Pyro communication. Args: classname (str): Name of class which was serialized. indict (Dict): Serialized representation of instance as dictionary. Returns: AmnesRemoteException: Instance deserialized from dictionary representation. """ exc = AmnesRemoteException() # pylint: disable=protected-access exc._pyroTraceback = indict["attributes"]["_pyroTraceback"] # type: ignore return exc @staticmethod @contextmanager def exception_handling() -> Iterator[None]: """Contextmanager for exception handling of worker remote exceptions. Yields: None: No yield value available. Raises: AmnesRemoteException: If any exception was raised in context. """ try: yield except Exception as exc: type_, value, traceback = sys.exc_info() AmnesRemoteException.exception_hook_sys(type_, value, traceback) raise AmnesRemoteException() from exc @staticmethod def exception_hook_sys( type_: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Python exception hook for default system. Args: type_ (Type[BaseException]): Type of raised exception. value (BaseException): Raised exception. traceback (TracebackType): Traceback of raised exception. """ if hasattr(value, "_pyroTraceback"): AmnesRemoteException.__log( logging.ERROR, "".join(get_pyro_traceback(type_, value, traceback)) ) else: AmnesRemoteException.__log( logging.ERROR, "".join(tb.format_exception(type_, value, traceback)) ) @staticmethod def exception_hook_threads(args) -> None: # type: ignore """Python exception hook for threads. Args: args (ExceptionHookArgs): Exception hook arguments. """ AmnesRemoteException.exception_hook_sys( args.exc_type, args.exc_value, args.exc_traceback ) @staticmethod def __log(level: int, msg: str) -> None: """Logging function with STDERR fallback. Args: level (int): Log level used for logging. msg (str): Message which should be logged. """ if AmnesRemoteException.LOGGER is None: print( "Logger for exceptions not configured, using fallback!", file=sys.stderr ) print(msg, file=sys.stderr) else: AmnesRemoteException.LOGGER.log(level, msg)
Ancestors
- builtins.Exception
- builtins.BaseException
Class variables
var LOGGER : Union[logging.Logger, NoneType]
Static methods
def deserialize(classname: str, indict: Dict) -> AmnesRemoteException
-
Deserialize AmnesRemoteException dictionary from Pyro communication.
Args
classname
:str
- Name of class which was serialized.
indict
:Dict
- Serialized representation of instance as dictionary.
Returns
AmnesRemoteException
- Instance deserialized from dictionary representation.
Expand source code
@staticmethod def deserialize( classname: str, indict: Dict # pylint: disable=unused-argument ) -> AmnesRemoteException: """Deserialize AmnesRemoteException dictionary from Pyro communication. Args: classname (str): Name of class which was serialized. indict (Dict): Serialized representation of instance as dictionary. Returns: AmnesRemoteException: Instance deserialized from dictionary representation. """ exc = AmnesRemoteException() # pylint: disable=protected-access exc._pyroTraceback = indict["attributes"]["_pyroTraceback"] # type: ignore return exc
def exception_handling() -> Iterator[NoneType]
-
Contextmanager for exception handling of worker remote exceptions.
Yields
None
- No yield value available.
Raises
AmnesRemoteException
- If any exception was raised in context.
Expand source code
@staticmethod @contextmanager def exception_handling() -> Iterator[None]: """Contextmanager for exception handling of worker remote exceptions. Yields: None: No yield value available. Raises: AmnesRemoteException: If any exception was raised in context. """ try: yield except Exception as exc: type_, value, traceback = sys.exc_info() AmnesRemoteException.exception_hook_sys(type_, value, traceback) raise AmnesRemoteException() from exc
def exception_hook_sys(type_: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> NoneType
-
Python exception hook for default system.
Args
type_
:Type[BaseException]
- Type of raised exception.
value
:BaseException
- Raised exception.
traceback
:TracebackType
- Traceback of raised exception.
Expand source code
@staticmethod def exception_hook_sys( type_: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Python exception hook for default system. Args: type_ (Type[BaseException]): Type of raised exception. value (BaseException): Raised exception. traceback (TracebackType): Traceback of raised exception. """ if hasattr(value, "_pyroTraceback"): AmnesRemoteException.__log( logging.ERROR, "".join(get_pyro_traceback(type_, value, traceback)) ) else: AmnesRemoteException.__log( logging.ERROR, "".join(tb.format_exception(type_, value, traceback)) )
def exception_hook_threads(args) -> NoneType
-
Python exception hook for threads.
Args
args
:ExceptionHookArgs
- Exception hook arguments.
Expand source code
@staticmethod def exception_hook_threads(args) -> None: # type: ignore """Python exception hook for threads. Args: args (ExceptionHookArgs): Exception hook arguments. """ AmnesRemoteException.exception_hook_sys( args.exc_type, args.exc_value, args.exc_traceback )
class ExecutionApp (*args, **kwds)
-
Abstract generic execution application used as base for AMNES components.
Every AMNES component which should be executed as a service must inherit from this class.
Generic Types: _Config: Subclass of
ExecutionConfiguration
.Attributes
configuration
:_Config
- Initialized configuration for the execution application.
base
:str
- AMNES base directory for execution application.
Generic constructor method for execution application.
Args
appid
:str
- Application ID used for logging.
configuration
:_Config
- Initialized configuration for the execution application.
base
:str
- AMNES base directory for execution application.
debug
:bool
- If debug messages should be logged.
Expand source code
class ExecutionApp(Generic[_Config], InstanceLogging, metaclass=ABCMeta): """Abstract generic execution application used as base for AMNES components. Every AMNES component which should be executed as a service must inherit from this class. Generic Types: _Config: Subclass of `ExecutionConfiguration`. Attributes: configuration (_Config): Initialized configuration for the execution application. base (str): AMNES base directory for execution application. """ def __init__(self, appid: str, configuration: _Config, base: str, debug: bool): """Generic constructor method for execution application. Args: appid (str): Application ID used for logging. configuration (_Config): Initialized configuration for the execution application. base (str): AMNES base directory for execution application. debug (bool): If debug messages should be logged. """ threading.current_thread().name = "App" patcher.patch() self.__set_configuration(configuration) self.__set_base(base) InstanceLogging.__init__(self, self.__configure_logging(appid, debug)) self.__shutdown_lock = Lock() self.__register_serializer() def __configure_logging(self, appid: str, debug: bool) -> Logger: """Configure logging for execution application. Args: appid (str): Application ID used for logging. debug (bool): If debug messages should be logged. Returns: Logger: Logger instance for InstanceLogging constructor. """ app_logger = AmnesLogger().logger.getChild(appid) app_logger.setLevel(logging.DEBUG if debug else logging.INFO) for handler in create_handlers(self.configuration.execution.logging): app_logger.addHandler(handler) AmnesRemoteException.LOGGER = app_logger if app_logger.level == logging.DEBUG: app_logger.debug("Debug Level for Logging enabled.") return app_logger def execute(self) -> None: """Execution method which is called to start the application. Before calling `self.logic()`, a signal hander is registered to catch `SIGINT` and call `self.shutdown()` if it is catched. """ def signalhandler( sig: int, frame: FrameType # pylint: disable=unused-argument ) -> None: if self.__shutdown_lock.acquire(): self.shutdown() sys.exit(0) # Only one SIGINT handler should run. # This call will end with an exit, so lock is not released. signal.signal(signal.SIGINT, signalhandler) sys.excepthook = AmnesRemoteException.exception_hook_sys threading.excepthook = AmnesRemoteException.exception_hook_threads self.logic() @abstractmethod def logic(self) -> None: """Abstract method running the main logic of the execution application. This method must be implemented by all subclasses of `ExecutionApp` and is called by `self.execute()` on application execution. """ @abstractmethod def shutdown(self) -> None: """Abstract method for shutting down the execution application. This method must be implemented by all subclasses of `ExecutionApp` and is called on `SIGINT` to shutdown the application. If no `sys.exit()` is called in the implementation of this method, the application will exit with exit code 0. """ @property def configuration(self) -> _Config: """_Config: Initialized configuration for the execution application. Returns: _Config: Initialized configuration for the execution application. """ return self.__configuration def __set_configuration(self, configuration: _Config) -> None: """Set configuration for the execution application. Args: configuration (_Config): Initialized configuration for the execution application. """ self.__configuration: _Config = configuration @property def base(self) -> str: """str: AMNES base directory for execution application. Returns: str: AMNES base directory for execution application. """ return self.__base def __set_base(self, base: str) -> None: """Set AMNES base directory for execution application. Args: base (str): AMNES base directory for execution application. """ self.__base: str = base @staticmethod def __register_serializer() -> None: """Register serializers and deserializers for Pyro communication.""" SerializerBase.register_dict_to_class( f"{AmnesRemoteException.__module__}.{AmnesRemoteException.__name__}", AmnesRemoteException.deserialize, ) SerializerBase.register_class_to_dict(NodeTaskParams, NodeTaskParams.serialize) SerializerBase.register_dict_to_class( f"{NodeTaskParams.__module__}.{NodeTaskParams.__name__}", NodeTaskParams.deserialize, ) SerializerBase.register_class_to_dict(NodeTaskFiles, NodeTaskFiles.serialize) SerializerBase.register_dict_to_class( f"{NodeTaskFiles.__module__}.{NodeTaskFiles.__name__}", NodeTaskFiles.deserialize, )
Ancestors
- typing.Generic
- InstanceLogging
Subclasses
Instance variables
var base : str
-
str: AMNES base directory for execution application.
Returns
str
- AMNES base directory for execution application.
Expand source code
@property def base(self) -> str: """str: AMNES base directory for execution application. Returns: str: AMNES base directory for execution application. """ return self.__base
var configuration : ~_Config
-
_Config: Initialized configuration for the execution application.
Returns
_Config
- Initialized configuration for the execution application.
Expand source code
@property def configuration(self) -> _Config: """_Config: Initialized configuration for the execution application. Returns: _Config: Initialized configuration for the execution application. """ return self.__configuration
Methods
def execute(self) -> NoneType
-
Execution method which is called to start the application.
Before calling
self.logic()
, a signal hander is registered to catchSIGINT
and callself.shutdown()
if it is catched.Expand source code
def execute(self) -> None: """Execution method which is called to start the application. Before calling `self.logic()`, a signal hander is registered to catch `SIGINT` and call `self.shutdown()` if it is catched. """ def signalhandler( sig: int, frame: FrameType # pylint: disable=unused-argument ) -> None: if self.__shutdown_lock.acquire(): self.shutdown() sys.exit(0) # Only one SIGINT handler should run. # This call will end with an exit, so lock is not released. signal.signal(signal.SIGINT, signalhandler) sys.excepthook = AmnesRemoteException.exception_hook_sys threading.excepthook = AmnesRemoteException.exception_hook_threads self.logic()
def logic(self) -> NoneType
-
Abstract method running the main logic of the execution application.
This method must be implemented by all subclasses of
ExecutionApp
and is called byself.execute()
on application execution.Expand source code
@abstractmethod def logic(self) -> None: """Abstract method running the main logic of the execution application. This method must be implemented by all subclasses of `ExecutionApp` and is called by `self.execute()` on application execution. """
def shutdown(self) -> NoneType
-
Abstract method for shutting down the execution application.
This method must be implemented by all subclasses of
ExecutionApp
and is called onSIGINT
to shutdown the application. If nosys.exit()
is called in the implementation of this method, the application will exit with exit code 0.Expand source code
@abstractmethod def shutdown(self) -> None: """Abstract method for shutting down the execution application. This method must be implemented by all subclasses of `ExecutionApp` and is called on `SIGINT` to shutdown the application. If no `sys.exit()` is called in the implementation of this method, the application will exit with exit code 0. """
Inherited members