Source code for ewokscore.bindings

from pathlib import Path
from typing import Any, Optional, List, Union, Dict

from .task import Task
from .node import NodeIdType
from .graph import load_graph as _load_graph
from .graph import TaskGraph
from .graph.execute import sequential
from .graph.graph_io import update_default_inputs
from .graph.serialize import GraphRepresentation
from .events import job_decorator as execute_graph_decorator

__all__ = [
    "execute_graph",
    "load_graph",
    "save_graph",
    "convert_graph",
    "graph_is_supported",
    "execute_graph_decorator",
]


[docs] def load_graph( graph: Any, inputs: Optional[List[dict]] = None, representation: Optional[Union[GraphRepresentation, str]] = None, root_dir: Optional[Union[str, Path]] = None, root_module: Optional[str] = None, ) -> TaskGraph: taskgraph = _load_graph( source=graph, representation=representation, root_dir=root_dir, root_module=root_module, ) if inputs: update_default_inputs(taskgraph.graph, inputs) return taskgraph
[docs] def save_graph( graph: TaskGraph, destination, representation: Optional[Union[GraphRepresentation, str]] = None, **save_options, ) -> Union[str, dict]: return graph.dump(destination, representation=representation, **save_options)
[docs] def convert_graph( source, destination, inputs: Optional[List[dict]] = None, load_options: Optional[dict] = None, save_options: Optional[dict] = None, ) -> Union[str, dict]: if load_options is None: load_options = dict() if save_options is None: save_options = dict() graph = load_graph(source, inputs=inputs, **load_options) return save_graph(graph, destination, **save_options)
[docs] @execute_graph_decorator(engine="core") def execute_graph( graph, inputs: Optional[List[dict]] = None, load_options: Optional[dict] = None, varinfo: Optional[dict] = None, execinfo: Optional[dict] = None, task_options: Optional[dict] = None, raise_on_error: Optional[bool] = True, outputs: Optional[List[dict]] = None, merge_outputs: Optional[bool] = True, output_tasks: Optional[bool] = False, ) -> Union[Dict[NodeIdType, Task], Dict[str, Any]]: if load_options is None: load_options = dict() taskgraph = load_graph(graph, inputs=inputs, **load_options) return sequential.execute_graph( taskgraph.graph, varinfo=varinfo, execinfo=execinfo, task_options=task_options, raise_on_error=raise_on_error, outputs=outputs, merge_outputs=merge_outputs, output_tasks=output_tasks, )
[docs] def graph_is_supported(graph: TaskGraph) -> bool: return not graph.is_cyclic and not graph.has_conditional_links