from collections import Counter
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import networkx
from ... import events
from ...inittask import add_dynamic_inputs
from ...inittask import instantiate_task as _instantiate_task
from ...node import NodeIdType
from ...task import Task
from .. import analysis
from .. import graph_io
[docs]
def instantiate_task(graph: networkx.DiGraph, node_id: NodeIdType, **kw) -> Task:
"""Named arguments are dynamic input and Variable config.
Default input from the persistent representation are added internally.
"""
# Dynamic input has priority over default input
nodeattrs = graph.nodes[node_id]
return _instantiate_task(node_id, nodeattrs, **kw)
[docs]
def instantiate_task_static(
graph: networkx.DiGraph,
node_id: NodeIdType,
tasks: Optional[Dict[Task, int]] = None,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
evict_result_counter: Optional[Dict[NodeIdType, int]] = None,
) -> Task:
"""Instantiate destination task while no access to the dynamic inputs.
Side effect: `tasks` will contain all predecessors.
"""
if analysis.graph_is_cyclic(graph):
raise RuntimeError("cannot execute cyclic graphs with ewokscore")
if tasks is None:
tasks = dict()
if evict_result_counter is None:
evict_result_counter = dict()
# Input from previous tasks (instantiate them if needed)
dynamic_inputs = dict()
for source_id in analysis.node_predecessors(graph, node_id):
source_task = tasks.get(source_id, None)
if source_task is None:
source_task = instantiate_task_static(
graph,
source_id,
tasks=tasks,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
evict_result_counter=evict_result_counter,
)
link_attrs = graph[source_id][node_id]
add_dynamic_inputs(
dynamic_inputs,
link_attrs,
source_task.output_variables,
source_id=source_id,
target_id=node_id,
)
# Evict intermediate results
if evict_result_counter:
evict_result_counter[source_id] -= 1
if evict_result_counter[source_id] == 0:
tasks.pop(source_id)
# Instantiate the requested task
target_task = instantiate_task(
graph,
node_id,
inputs=dynamic_inputs,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
)
tasks[node_id] = target_task
return target_task
[docs]
def successor_counter(graph: networkx.DiGraph) -> Dict[NodeIdType, int]:
nsuccessor = Counter()
for edge in graph.edges:
nsuccessor[edge[0]] += 1
return nsuccessor
[docs]
def execute_graph(
graph: networkx.DiGraph,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
raise_on_error: Optional[bool] = True,
outputs: Optional[List[dict]] = None,
merge_outputs: Optional[bool] = True,
output_tasks: Optional[bool] = False,
) -> Union[Dict[NodeIdType, Task], Dict[str, Any]]:
"""Sequential execution of DAGs.
When `output_tasks` is `True` the arguments `outputs` and `merge_outputs`
are ignored and instead of returning output values, it returns `Task` instances.
This was introduced for testing.
"""
with events.workflow_context(execinfo, workflow=graph) as execinfo:
if analysis.graph_is_cyclic(graph):
raise RuntimeError("cannot execute cyclic graphs")
if analysis.graph_has_conditional_links(graph):
raise RuntimeError("cannot execute graphs with conditional links")
# Pepare containers for local state
tasks = dict()
if output_tasks:
output_values = None
evict_result_counter = None
else:
outputs = graph_io.parse_outputs(graph, outputs)
output_values = dict()
evict_result_counter = successor_counter(graph)
# Execute in topological order
for node_id in analysis.topological_sort(graph):
task = instantiate_task_static(
graph,
node_id,
tasks=tasks,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
evict_result_counter=evict_result_counter,
)
task.execute(
raise_on_error=raise_on_error,
cleanup_references=evict_result_counter is not None,
)
if execinfo:
execinfo.setdefault("exception", task.exception)
if not output_tasks:
graph_io.add_output_values(
output_values, node_id, task, outputs, merge_outputs=merge_outputs
)
# Return results or Task instances
if output_tasks:
return tasks
else:
return output_values