Source code for ewokscore.events.initialize_events
"""Initialize ewoks event fields
"""
import os
import getpass
import socket
from uuid import uuid4
from typing import Mapping, Optional, Union
import networkx
ExecInfoType = Union[Mapping, None]
[docs]
def init_job(execinfo: ExecInfoType, **static_job_info) -> ExecInfoType:
if execinfo is None:
return None
execinfo = dict(execinfo)
set_environment(execinfo)
job_id = execinfo.get("job_id")
if job_id is None:
execinfo["job_id"] = str(uuid4())
execinfo.update(static_job_info)
return execinfo
[docs]
def init_workflow(
execinfo: ExecInfoType,
workflow: Union[networkx.DiGraph, str, None] = None,
**static_workflow_info,
) -> ExecInfoType:
if execinfo is None:
return None
execinfo = dict(execinfo)
set_environment(execinfo)
if workflow is None:
default = (
f"{execinfo['host_name']}-{execinfo['process_id']}-{execinfo['user_name']}"
)
execinfo["workflow_id"] = default
elif isinstance(workflow, str):
execinfo["workflow_id"] = workflow
else:
try:
execinfo["workflow_id"] = str(workflow.graph["id"])
except KeyError:
raise ValueError("the graph needs an 'id' for execution events")
execinfo.update(static_workflow_info)
return execinfo
[docs]
def init_node(
execinfo: ExecInfoType, node_id: Optional[str], task_id: Optional[str]
) -> ExecInfoType:
if execinfo is None:
return None
execinfo = dict(execinfo)
set_environment(execinfo)
if node_id:
execinfo["node_id"] = node_id
if task_id:
execinfo["task_id"] = task_id
return execinfo
[docs]
def set_environment(execinfo: ExecInfoType) -> ExecInfoType:
if execinfo is None:
return None
execinfo["host_name"] = socket.gethostname()
execinfo["process_id"] = os.getpid()
execinfo["user_name"] = getpass.getuser()