Source code for ewokscore.tests.test_events
import logging
from contextlib import contextmanager
from logging.handlers import QueueHandler
from queue import Empty
from queue import Queue
import pytest
from ewokscore import events
[docs]
@contextmanager
def capture_events(blocking):
queue = Queue()
handler = QueueHandler(queue)
events.add_handler(handler, blocking)
def get_event():
try:
return queue.get(block=blocking, timeout=1)
except Empty:
raise RuntimeError("event not received by handler") from None
try:
yield get_event
finally:
events.cleanup()
[docs]
@pytest.mark.parametrize("blocking", [False, True])
def test_workflow_event(blocking):
execinfo = {
"job_id": None,
"host_name": None,
"user_name": None,
"process_id": None,
"workflow_id": None,
}
with capture_events(blocking) as get_event:
events.send_workflow_event(execinfo=execinfo, event="start")
event = get_event()
assert event.type == "start"
events.send_workflow_event(execinfo=execinfo, event="end", error_message="abc")
event = get_event()
assert event.type == "end"
assert event.error
assert event.error_message == "abc"
[docs]
@pytest.mark.parametrize("blocking", [False, True])
def test_task_event(blocking):
execinfo = {
"job_id": None,
"host_name": None,
"user_name": None,
"process_id": None,
"workflow_id": None,
"node_id": None,
"task_id": None,
}
with capture_events(blocking) as get_event:
events.send_task_event(
execinfo=execinfo,
event="start",
)
event = get_event()
assert event.type == "start"
events.send_task_event(
execinfo=execinfo,
event="progress",
progress=50,
)
event = get_event()
assert event.type == "progress"
assert event.progress == 50
events.send_task_event(
execinfo=execinfo,
event="end",
)
event = get_event()
assert event.type == "end"
assert not event.error
assert event.error_message is None
[docs]
@pytest.mark.parametrize("blocking", [False, True])
def test_root_logger(blocking, caplog):
execinfo = {
"job_id": None,
"host_name": None,
"user_name": None,
"process_id": None,
"workflow_id": None,
}
with capture_events(blocking) as get_event:
with caplog.at_level(logging.WARNING):
events.send_workflow_event(execinfo=execinfo, event="start")
event = get_event()
assert event.type == "start"
assert not caplog.records
with caplog.at_level(logging.INFO):
events.send_workflow_event(execinfo=execinfo, event="start")
event = get_event()
assert event.type == "start"
assert len(caplog.records) == 1
event_root = caplog.records[0]
assert event_root.type == "start"