Source code for ewokscore.ppftasks

from typing import Mapping

from ewoksutils.import_utils import import_method

from .task import Task

METHOD_ARGUMENT = "_method"
PPF_DICT_ARGUMENT = "_ppfdict"


[docs] class PpfMethodExecutorTask( Task, input_names=[METHOD_ARGUMENT], optional_input_names=[PPF_DICT_ARGUMENT], output_names=[PPF_DICT_ARGUMENT], ): """Ppf workflows pass one dictionary around between tasks and this dictionary gets updates by each task. This dictionary is unpacked into the unexpected arguments and passed to the method. """ METHOD_ARGUMENT = METHOD_ARGUMENT PPF_DICT_ARGUMENT = PPF_DICT_ARGUMENT def _get_task_identifier(self, inputs: Mapping) -> str: return inputs.get(self.METHOD_ARGUMENT, self.class_registry_name())
[docs] def run(self): method_kwargs = self.get_input_values() fullname = method_kwargs.pop(self.METHOD_ARGUMENT) method = import_method(fullname) ppfdict = method_kwargs.pop(self.PPF_DICT_ARGUMENT, None) if ppfdict: method_kwargs.update(ppfdict) result = method(**method_kwargs) method_kwargs.update(result) self.outputs._ppfdict = method_kwargs
[docs] class PpfPortTask( Task, optional_input_names=[PPF_DICT_ARGUMENT], output_names=[PPF_DICT_ARGUMENT] ): """A ppfmethod which represents the identity mapping""" PPF_DICT_ARGUMENT = PPF_DICT_ARGUMENT def _get_task_identifier(self, inputs: Mapping) -> str: return ""
[docs] def run(self): method_kwargs = self.get_input_values() ppfdict = method_kwargs.pop(self.PPF_DICT_ARGUMENT, None) if ppfdict: method_kwargs.update(ppfdict) self.outputs._ppfdict = method_kwargs