"""Define and parse ewoks events"""
import traceback
from datetime import datetime
from numbers import Number
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
from ewoksutils.event_utils import FIELD_TYPES
from . import global_state
from .initialize_events import ExecInfoType
[docs]
def send_job_event(**kw):
kw = _preprocess_event(**kw)
logargs, logkwargs = _parse_job_event(**kw)
_send_event(*logargs, **logkwargs)
[docs]
def send_workflow_event(**kw):
kw = _preprocess_event(**kw)
logargs, logkwargs = _parse_workflow_event(**kw)
_send_event(*logargs, **logkwargs)
[docs]
def send_task_event(**kw):
kw = _preprocess_event(**kw)
logargs, logkwargs = _parse_task_event(**kw)
_send_event(*logargs, **logkwargs)
[docs]
def timestamp() -> str:
return datetime.now().astimezone().isoformat()
def _preprocess_event(execinfo: ExecInfoType = None, **logkwargs):
if execinfo:
logkwargs = {**execinfo, **logkwargs}
if not logkwargs.get("time"):
logkwargs["time"] = timestamp()
_parse_exception(logkwargs)
return logkwargs
def _parse_exception(logkwargs):
exception = logkwargs.pop("exception", None)
if exception is None:
return
elif isinstance(exception, str):
logkwargs["error"] = True
tb = exception
error_message = _extract_reason_from_traceback(tb)
if not logkwargs.get("error_message"):
logkwargs["error_message"] = error_message
if not logkwargs.get("error_traceback"):
logkwargs["error_traceback"] = tb
elif isinstance(exception, BaseException):
logkwargs["error"] = True
tb = "".join(
traceback.format_exception(
type(exception), exception, exception.__traceback__
)
)
if not logkwargs.get("error_traceback"):
logkwargs["error_traceback"] = tb
if logkwargs.get("context") == "node":
exception = _first_exception(exception)
if not logkwargs.get("error_message"):
error_message = str(exception)
if not error_message:
error_message = type(exception).__name__
logkwargs["error_message"] = error_message
else:
raise TypeError(
"ewoks event field 'exception' should be a string or an exception instance"
)
def _first_exception(exception: BaseException) -> BaseException:
while True:
if exception.__cause__ is not None:
# exception raised from another exception
exception = exception.__cause__
elif exception.__context__ is not None:
# exception raised during handling of another exception
exception = exception.__context__
else:
return exception
def _extract_reason_from_traceback(tb: str) -> str:
for line in reversed(tb.split("\n")):
if ":" in line:
return line
return ""
def _parse_workflow_event(**logkwargs):
event_data, logkwargs = _extract_workflow_fields(**logkwargs)
msg = _validate_event(event_data)
logargs = (
"[job %r] [workflow %r] %s",
event_data["job_id"],
event_data["workflow_id"],
msg,
)
logkwargs["event_data"] = event_data
return logargs, logkwargs
def _parse_job_event(**logkwargs):
event_data, logkwargs = _extract_job_fields(**logkwargs)
msg = _validate_event(event_data)
logargs = ("[job %r] %s", event_data["job_id"], msg)
logkwargs["event_data"] = event_data
return logargs, logkwargs
def _parse_task_event(**logkwargs):
event_data, logkwargs = _extract_task_fields(**logkwargs)
msg = _validate_event(event_data)
logargs = (
"[job %r] [workflow %r] [node %r] [task %r] %s",
event_data["job_id"],
event_data["workflow_id"],
event_data["node_id"],
event_data["task_id"],
msg,
)
logkwargs["event_data"] = event_data
return logargs, logkwargs
def _extract_common_fields(
host_name: str,
process_id: int,
user_name: str,
job_id: str,
event: str,
engine: Optional[str] = None,
time: Optional[str] = None,
error: Optional[bool] = None,
error_message: Optional[str] = None,
error_traceback: Optional[str] = None,
**logkwargs,
):
event_data = {
"host_name": host_name,
"process_id": process_id,
"user_name": user_name,
"job_id": job_id,
"engine": engine,
"type": event,
"time": time,
"error": error,
"error_message": error_message,
"error_traceback": error_traceback,
}
return event_data, logkwargs
def _extract_job_fields(
**logkwargs,
):
event_data, logkwargs = _extract_common_fields(**logkwargs)
event_data["context"] = "job"
return event_data, logkwargs
def _extract_workflow_fields(
workflow_id: str,
**logkwargs,
):
event_data, logkwargs = _extract_job_fields(**logkwargs)
event_data["context"] = "workflow"
event_data["workflow_id"] = workflow_id
return event_data, logkwargs
def _extract_task_fields(
node_id: str,
task_id: str,
progress: Optional[Number] = None,
task_uri: Optional[str] = None,
input_uris: Optional[List[Dict[str, Optional[str]]]] = None,
output_uris: Optional[List[Dict[str, Optional[str]]]] = None,
**logkwargs,
):
event_data, logkwargs = _extract_workflow_fields(**logkwargs)
event_data["context"] = "node"
event_data["node_id"] = node_id
event_data["task_id"] = task_id
event_data["progress"] = progress
event_data["task_uri"] = task_uri
event_data["input_uris"] = input_uris
event_data["output_uris"] = output_uris
return event_data, logkwargs
def _validate_event(event_data: dict) -> str:
event_data.update((field, None) for field in set(FIELD_TYPES) - set(event_data))
event = event_data["type"]
if event_data["context"] == "job":
obj = "job"
elif event_data["context"] == "workflow":
obj = "workflow"
elif event_data["context"] == "node":
obj = "task"
else:
raise ValueError(f"context '{event_data['context']}' is unknown")
if event == "start":
return f"{obj} started"
elif event == "end":
error = bool(
event_data["error"]
or event_data["error_message"]
or event_data["error_traceback"]
)
event_data["error"] = error
if error:
if event_data["error_message"] is None:
event_data["error_message"] = ""
if event_data["error_traceback"] is None:
event_data["error_traceback"] = ""
if not error:
return f"{obj} finished"
elif event_data["error"] and event_data["error_message"]:
return f"{obj} failed ({event_data['error_message']})"
else:
return f"{obj} failed"
elif event == "progress":
if not isinstance(event_data["progress"], Number):
event_data["progress"] = float("nan")
return f"{obj} progress {event_data['progress']}%"
else:
raise ValueError(f"unknown ewoks event type '{event}'")
def _send_event(msg: str, *args, event_data=None, extra=None, contexts=None, **kw):
if not isinstance(event_data, Mapping):
raise TypeError("'event_data' should be a mapping")
if extra:
if not isinstance(extra, Mapping):
raise TypeError("'extra' should be a mapping")
extra = {**extra, **event_data}
else:
extra = event_data
global_state.send(msg, *args, extra=extra, **kw)