Source code for ewokscore.ppftasks

from .task import Task
from ewoksutils.import_utils import import_method

METHOD_ARGUMENT = "_method"
PPF_DICT_ARGUMENT = "_ppfdict"


[docs] class PpfMethodExecutorTask( Task, input_names=[METHOD_ARGUMENT], optional_input_names=[PPF_DICT_ARGUMENT], output_names=[PPF_DICT_ARGUMENT], ): """Ppf workflows pass one dictionary around between tasks and this dictionary gets updates by each task. This dictionary is unpacked into the unexpected arguments and passed to the method. """ METHOD_ARGUMENT = METHOD_ARGUMENT PPF_DICT_ARGUMENT = PPF_DICT_ARGUMENT
[docs] def run(self): method_kwargs = self.get_input_values() fullname = method_kwargs.pop(self.METHOD_ARGUMENT) method = import_method(fullname) ppfdict = method_kwargs.pop(self.PPF_DICT_ARGUMENT, None) if ppfdict: method_kwargs.update(ppfdict) result = method(**method_kwargs) method_kwargs.update(result) self.outputs._ppfdict = method_kwargs
[docs] class PpfPortTask( Task, optional_input_names=[PPF_DICT_ARGUMENT], output_names=[PPF_DICT_ARGUMENT] ): """A ppfmethod which represents the identity mapping""" PPF_DICT_ARGUMENT = PPF_DICT_ARGUMENT
[docs] def run(self): method_kwargs = self.get_input_values() ppfdict = method_kwargs.pop(self.PPF_DICT_ARGUMENT, None) if ppfdict: method_kwargs.update(ppfdict) self.outputs._ppfdict = method_kwargs