Source code for ewokscore.events.contexts

"""Contexts for job, workflow or node events. Allows for initializing
event fields and sending start/end events.
"""

from contextlib import contextmanager
from functools import wraps
from typing import Mapping
from typing import Union

from . import global_state
from .initialize_events import init_job
from .initialize_events import init_node
from .initialize_events import init_workflow
from .send_events import ExecInfoType
from .send_events import send_job_event
from .send_events import send_workflow_event

RawExecInfoType = Union[Mapping, bool, str, None]


[docs] def job_decorator(**static_job_info): def _job_decorator(method): @wraps(method) def wrapper(*args, execinfo: RawExecInfoType = None, **kw): with job_context(execinfo, **static_job_info) as execinfo: return method(*args, execinfo=execinfo, **kw) return wrapper return _job_decorator
[docs] @contextmanager def job_context(execinfo: RawExecInfoType, **static_job_info) -> ExecInfoType: if execinfo is None: execinfo = global_state.ENABLE_EWOKS_EVENTS_BY_DEFAULT if isinstance(execinfo, str): execinfo = {"job_id": execinfo} elif isinstance(execinfo, bool): if execinfo: execinfo = dict() else: execinfo = None elif execinfo is None: pass elif not isinstance(execinfo, Mapping): raise TypeError execinfo = init_job(execinfo, **static_job_info) if execinfo is None: yield None else: with _context(execinfo, "job", send_job_event, execinfo["job_id"]) as execinfo: yield execinfo
[docs] @contextmanager def workflow_context(execinfo: ExecInfoType, **kw) -> ExecInfoType: execinfo = init_workflow(execinfo, **kw) if execinfo is None: yield None else: with _context( execinfo, "workflow", send_workflow_event, execinfo["workflow_id"] ) as execinfo: yield execinfo
[docs] @contextmanager def node_context(execinfo: ExecInfoType, **kw) -> ExecInfoType: yield init_node(execinfo, **kw)
@contextmanager def _context( execinfo: ExecInfoType, context, send_context_event, obj_id ) -> ExecInfoType: contexts = execinfo.get("contexts") if contexts is None: contexts = {"job": list(), "workflow": list()} execinfo["contexts"] = contexts obj_ids = contexts[context] first_context = obj_id not in obj_ids if first_context: send_context_event(execinfo=execinfo, event="start") obj_ids.append(obj_id) try: yield execinfo except BaseException as e: if not execinfo.get("exception"): execinfo["exception"] = e raise finally: if first_context: obj_ids.remove(obj_id) send_context_event(execinfo=execinfo, event="end")