Source code for ewokscore.events.send_events

"""Define and parse ewoks events
"""

import traceback
from numbers import Number
from datetime import datetime
from typing import Dict, List, Mapping, Optional

from ewoksutils.event_utils import FIELD_TYPES

from . import global_state
from .initialize_events import ExecInfoType


[docs] def send_job_event(**kw): kw = _preprocess_event(**kw) logargs, logkwargs = _parse_job_event(**kw) _send_event(*logargs, **logkwargs)
[docs] def send_workflow_event(**kw): kw = _preprocess_event(**kw) logargs, logkwargs = _parse_workflow_event(**kw) _send_event(*logargs, **logkwargs)
[docs] def send_task_event(**kw): kw = _preprocess_event(**kw) logargs, logkwargs = _parse_task_event(**kw) _send_event(*logargs, **logkwargs)
[docs] def timestamp() -> str: return datetime.now().astimezone().isoformat()
def _preprocess_event(execinfo: ExecInfoType = None, **logkwargs): if execinfo: logkwargs = {**execinfo, **logkwargs} if not logkwargs.get("time"): logkwargs["time"] = timestamp() _parse_exception(logkwargs) return logkwargs def _parse_exception(logkwargs): exception = logkwargs.pop("exception", None) if exception is None: return elif isinstance(exception, str): logkwargs["error"] = True tb = exception error_message = _extract_reason_from_traceback(tb) if not logkwargs.get("error_message"): logkwargs["error_message"] = error_message if not logkwargs.get("error_traceback"): logkwargs["error_traceback"] = tb elif isinstance(exception, BaseException): logkwargs["error"] = True tb = "".join( traceback.format_exception( type(exception), exception, exception.__traceback__ ) ) if not logkwargs.get("error_traceback"): logkwargs["error_traceback"] = tb if logkwargs.get("context") == "node": exception = _first_exception(exception) if not logkwargs.get("error_message"): error_message = str(exception) if not error_message: error_message = type(exception).__name__ logkwargs["error_message"] = error_message else: raise TypeError( "ewoks event field 'exception' should be a string or an exception instance" ) def _first_exception(exception: BaseException) -> BaseException: while True: if exception.__cause__ is not None: # exception raised from another exception exception = exception.__cause__ elif exception.__context__ is not None: # exception raised during handling of another exception exception = exception.__context__ else: return exception def _extract_reason_from_traceback(tb: str) -> str: for line in reversed(tb.split("\n")): if ":" in line: return line return "" def _parse_workflow_event(**logkwargs): event_data, logkwargs = _extract_workflow_fields(**logkwargs) msg = _validate_event(event_data) logargs = ( "[job %r] [workflow %r] %s", event_data["job_id"], event_data["workflow_id"], msg, ) logkwargs["event_data"] = event_data return logargs, logkwargs def _parse_job_event(**logkwargs): event_data, logkwargs = _extract_job_fields(**logkwargs) msg = _validate_event(event_data) logargs = ("[job %r] %s", event_data["job_id"], msg) logkwargs["event_data"] = event_data return logargs, logkwargs def _parse_task_event(**logkwargs): event_data, logkwargs = _extract_task_fields(**logkwargs) msg = _validate_event(event_data) logargs = ( "[job %r] [workflow %r] [node %r] [task %r] %s", event_data["job_id"], event_data["workflow_id"], event_data["node_id"], event_data["task_id"], msg, ) logkwargs["event_data"] = event_data return logargs, logkwargs def _extract_common_fields( host_name: str, process_id: int, user_name: str, job_id: str, event: str, engine: Optional[str] = None, time: Optional[str] = None, error: Optional[bool] = None, error_message: Optional[str] = None, error_traceback: Optional[str] = None, **logkwargs, ): event_data = { "host_name": host_name, "process_id": process_id, "user_name": user_name, "job_id": job_id, "engine": engine, "type": event, "time": time, "error": error, "error_message": error_message, "error_traceback": error_traceback, } return event_data, logkwargs def _extract_job_fields( **logkwargs, ): event_data, logkwargs = _extract_common_fields(**logkwargs) event_data["context"] = "job" return event_data, logkwargs def _extract_workflow_fields( workflow_id: str, **logkwargs, ): event_data, logkwargs = _extract_job_fields(**logkwargs) event_data["context"] = "workflow" event_data["workflow_id"] = workflow_id return event_data, logkwargs def _extract_task_fields( node_id: str, task_id: str, progress: Optional[Number] = None, task_uri: Optional[str] = None, input_uris: Optional[List[Dict[str, Optional[str]]]] = None, output_uris: Optional[List[Dict[str, Optional[str]]]] = None, **logkwargs, ): event_data, logkwargs = _extract_workflow_fields(**logkwargs) event_data["context"] = "node" event_data["node_id"] = node_id event_data["task_id"] = task_id event_data["progress"] = progress event_data["task_uri"] = task_uri event_data["input_uris"] = input_uris event_data["output_uris"] = output_uris return event_data, logkwargs def _validate_event(event_data: dict) -> str: event_data.update((field, None) for field in set(FIELD_TYPES) - set(event_data)) event = event_data["type"] if event_data["context"] == "job": obj = "job" elif event_data["context"] == "workflow": obj = "workflow" elif event_data["context"] == "node": obj = "task" else: raise ValueError(f"context '{event_data['context']}' is unknown") if event == "start": return f"{obj} started" elif event == "end": error = bool( event_data["error"] or event_data["error_message"] or event_data["error_traceback"] ) event_data["error"] = error if error: if event_data["error_message"] is None: event_data["error_message"] = "" if event_data["error_traceback"] is None: event_data["error_traceback"] = "" if not error: return f"{obj} finished" elif event_data["error"] and event_data["error_message"]: return f"{obj} failed ({event_data['error_message']})" else: return f"{obj} failed" elif event == "progress": if not isinstance(event_data["progress"], Number): event_data["progress"] = float("nan") return f"{obj} progress {event_data['progress']}%" else: raise ValueError(f"unknown ewoks event type '{event}'") def _send_event(msg: str, *args, event_data=None, extra=None, contexts=None, **kw): if not isinstance(event_data, Mapping): raise TypeError("'event_data' should be a mapping") if extra: if not isinstance(extra, Mapping): raise TypeError("'extra' should be a mapping") extra = {**extra, **event_data} else: extra = event_data global_state.send(msg, *args, extra=extra, **kw)