Hello world with validation#

A short script that defines a task with input and output validation, creates a workflow and executes it.

hello_world_validation.py#
from typing import Optional
from typing import Union

from pydantic import Field
from pydantic import model_validator

from ewokscore import execute_graph
from ewokscore.task import BaseInputModel
from ewokscore.task import BaseOutputModel
from ewokscore.task import Task

NumberOrStr = Union[int, float, str]


# Define task signature
class InputModel(BaseInputModel):
    a: NumberOrStr = Field(
        ..., description="First argument to sum.", examples=[10.5, "hello"]
    )
    b: Optional[NumberOrStr] = Field(
        None, description="Second argument to sum.", examples=[20.1, " world"]
    )

    @model_validator(mode="after")
    def check_compatible_types(self):
        if self.b is not None and (isinstance(self.a, str) ^ isinstance(self.b, str)):
            raise ValueError(
                "a and b must be of compatible types (both numbers or both strings)"
            )
        return self


class OutputModel(BaseOutputModel):
    result: NumberOrStr = Field(
        ..., description="Result of the sum.", examples=[30.6, "hello world"]
    )


# Implement a workflow task
class SumTask(Task, input_model=InputModel, output_model=OutputModel):
    def run(self):
        result = self.inputs.a
        if self.inputs.b is not None:
            result += self.inputs.b
        self.outputs.result = result


# Define a workflow with default inputs
graph = {"id": "testworkflow", "schema_version": "1.1"}
nodes = [
    {
        "id": "task1",
        "task_type": "class",
        "task_identifier": "__main__.SumTask",
        "default_inputs": [{"name": "a", "value": 1}],
    },
    {
        "id": "task2",
        "task_type": "class",
        "task_identifier": "__main__.SumTask",
        "default_inputs": [{"name": "b", "value": 1}],
    },
    {
        "id": "task3",
        "task_type": "class",
        "task_identifier": "__main__.SumTask",
        "default_inputs": [{"name": "b", "value": 1}],
    },
]
links = [
    {
        "source": "task1",
        "target": "task2",
        "data_mapping": [{"source_output": "result", "target_input": "a"}],
    },
    {
        "source": "task2",
        "target": "task3",
        "data_mapping": [{"source_output": "result", "target_input": "a"}],
    },
]
workflow = {"graph": graph, "nodes": nodes, "links": links}

# Define task inputs
inputs = [{"id": "task1", "name": "a", "value": 10}]

# Execute a workflow (use a proper Ewoks task scheduler in production)
varinfo = {"root_uri": "/tmp/myresults"}  # optionally save all task outputs
result = execute_graph(workflow, varinfo=varinfo, inputs=inputs)
print(result)