from pathlib import Path
from typing import Any, Optional, List, Union, Dict
from .task import Task
from .node import NodeIdType
from .graph import load_graph as _load_graph
from .graph import TaskGraph
from .graph.execute import sequential
from .graph.graph_io import update_default_inputs
from .graph.serialize import GraphRepresentation
from .events import job_decorator as execute_graph_decorator
__all__ = [
"execute_graph",
"load_graph",
"save_graph",
"convert_graph",
"graph_is_supported",
"execute_graph_decorator",
]
[docs]
def load_graph(
graph: Any,
inputs: Optional[List[dict]] = None,
representation: Optional[Union[GraphRepresentation, str]] = None,
root_dir: Optional[Union[str, Path]] = None,
root_module: Optional[str] = None,
) -> TaskGraph:
taskgraph = _load_graph(
source=graph,
representation=representation,
root_dir=root_dir,
root_module=root_module,
)
if inputs:
update_default_inputs(taskgraph.graph, inputs)
return taskgraph
[docs]
def save_graph(
graph: TaskGraph,
destination,
representation: Optional[Union[GraphRepresentation, str]] = None,
**save_options,
) -> Union[str, dict]:
return graph.dump(destination, representation=representation, **save_options)
[docs]
def convert_graph(
source,
destination,
inputs: Optional[List[dict]] = None,
load_options: Optional[dict] = None,
save_options: Optional[dict] = None,
) -> Union[str, dict]:
if load_options is None:
load_options = dict()
if save_options is None:
save_options = dict()
graph = load_graph(source, inputs=inputs, **load_options)
return save_graph(graph, destination, **save_options)
[docs]
@execute_graph_decorator(engine="core")
def execute_graph(
graph,
inputs: Optional[List[dict]] = None,
load_options: Optional[dict] = None,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
raise_on_error: Optional[bool] = True,
outputs: Optional[List[dict]] = None,
merge_outputs: Optional[bool] = True,
output_tasks: Optional[bool] = False,
) -> Union[Dict[NodeIdType, Task], Dict[str, Any]]:
if load_options is None:
load_options = dict()
taskgraph = load_graph(graph, inputs=inputs, **load_options)
return sequential.execute_graph(
taskgraph.graph,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
raise_on_error=raise_on_error,
outputs=outputs,
merge_outputs=merge_outputs,
output_tasks=output_tasks,
)
[docs]
def graph_is_supported(graph: TaskGraph) -> bool:
return not graph.is_cyclic and not graph.has_conditional_links