Source code for ewokscore.tests.examples.graphs.self_trigger
from . import graph
[docs]
@graph
def self_trigger():
task = "ewokscore.tests.examples.tasks.condsumtask.CondSumTask"
nodes = [
{
"id": "task1",
"default_inputs": [{"name": "a", "value": 1}],
"task_type": "class",
"task_identifier": task,
"force_start_node": True,
},
{
"id": "task2",
"default_inputs": [{"name": "b", "value": 1}],
"task_type": "class",
"task_identifier": task,
},
{
"id": "task3",
"task_type": "class",
"task_identifier": task,
},
]
links = [
{
"source": "task1",
"target": "task1",
"data_mapping": [{"source_output": "result", "target_input": "b"}],
"conditions": [{"source_output": "too_small", "value": True}],
},
{
"source": "task1",
"target": "task3",
"data_mapping": [{"source_output": "result", "target_input": "a"}],
},
{
"source": "task2",
"target": "task3",
"data_mapping": [{"source_output": "result", "target_input": "b"}],
},
]
# Cannot be executed with ewokscore execute_graph
expected = None
graph = {
"links": links,
"nodes": nodes,
}
return graph, expected