Source code for ewokscore.tests.test_events

from contextlib import contextmanager
from logging.handlers import QueueHandler
from queue import Queue, Empty
import pytest
from ewokscore import events


[docs] @contextmanager def capture_events(blocking): queue = Queue() handler = QueueHandler(queue) events.add_handler(handler, blocking) def get_event(): try: return queue.get(block=blocking, timeout=1) except Empty: raise RuntimeError("event not received by handler") from None try: yield get_event finally: events.remove_handler(handler) events.cleanup()
[docs] @pytest.mark.parametrize("blocking", [False, True]) def test_workflow_event(blocking): execinfo = { "job_id": None, "host_name": None, "user_name": None, "process_id": None, "workflow_id": None, } with capture_events(blocking) as get_event: events.send_workflow_event(execinfo=execinfo, event="start") event = get_event() assert event.type == "start" events.send_workflow_event(execinfo=execinfo, event="end", error_message="abc") event = get_event() assert event.type == "end" assert event.error assert event.error_message == "abc"
[docs] @pytest.mark.parametrize("blocking", [False, True]) def test_task_event(blocking): execinfo = { "job_id": None, "host_name": None, "user_name": None, "process_id": None, "workflow_id": None, "node_id": None, "task_id": None, } with capture_events(blocking) as get_event: events.send_task_event( execinfo=execinfo, event="start", ) event = get_event() assert event.type == "start" events.send_task_event( execinfo=execinfo, event="progress", progress=50, ) event = get_event() assert event.type == "progress" assert event.progress == 50 events.send_task_event( execinfo=execinfo, event="end", ) event = get_event() assert event.type == "end" assert not event.error assert event.error_message is None