Source code for ewokscore.tests.test_dynamic_tasks
from ewoksutils.import_utils import qualname
from ewokscore import Task
from ewokscore.inittask import instantiate_task
[docs]
def task_class_generator(qualname):
registry_name = qualname
if registry_name in Task.get_subclass_names():
return Task.get_subclass(qualname)
class DynamicTask(Task, output_names=["result"], registry_name=registry_name):
def run(self):
self.outputs.result = qualname
return DynamicTask
[docs]
def test_task_class_generator():
task_name = "some.unique.task.name"
task = instantiate_task(
"node_id",
{
"task_type": "generated",
"task_identifier": task_name,
"task_generator": qualname(task_class_generator),
},
)
task.execute()
assert task.get_output_values() == {"result": task_name}