Source code for ewokscore.graph.error_handlers

from typing import Mapping
import networkx

from .analysis import node_pure_descendants
from ewokscore.graph import analysis


[docs] def connect_default_error_handlers(graph: networkx.DiGraph) -> networkx.DiGraph: """All nodes without an error handler will be connected to all default error handlers. Default error handlers without predecessors will be removed. """ default_error_handlers = dict() for node_id, attrs in graph.nodes.items(): default_error_node = attrs.pop("default_error_node", False) if not default_error_node: continue link_attrs = attrs.pop("default_error_attributes", None) if not isinstance(link_attrs, Mapping): link_attrs = dict() link_attrs["on_error"] = True if not (set(link_attrs.keys()) & {"map_all_data", "data_mapping"}): link_attrs["map_all_data"] = True default_error_handlers[node_id] = link_attrs # All nodes which are not default error handlers nodes_without_error_handlers = set(graph.nodes.keys()) - set(default_error_handlers) # Remove the nodes that already have an error handler for edge, attrs in graph.edges.items(): source_id = edge[0] if attrs.get("on_error"): if source_id in nodes_without_error_handlers: nodes_without_error_handlers.remove(source_id) # Remove nodes that have any of the default error handlers as ancestor for node_id in set(nodes_without_error_handlers): for ancestor_id in analysis.node_ancestors(graph, node_id): if ancestor_id in default_error_handlers: nodes_without_error_handlers.remove(node_id) break if nodes_without_error_handlers: # Connect to the default error handlers for source_id in nodes_without_error_handlers: for target_id, link_attrs in default_error_handlers.items(): graph.add_edge(source_id, target_id, **link_attrs) else: # Remove the default error handlers and their pure descendants for node_id in default_error_handlers: try: next(graph.predecessors()) except StopIteration: nodes = set(node_pure_descendants(graph, node_id, include_node=True)) graph.remove_nodes_from(nodes)