Source code for ewokscore.graph.serialize

import enum
import importlib
import json
import logging
import os
from collections.abc import Mapping
from pathlib import Path
from typing import Optional
from typing import Tuple
from typing import Union

import networkx
import yaml
from ewoksutils.deprecation_utils import deprecated
from ewoksutils.path_utils import makedirs_from_filename
from packaging.version import Version

from ..node import node_id_from_json
from .models import GraphSource
from .schema import normalize_schema_version

logger = logging.getLogger(__name__)

GraphRepresentation = enum.Enum(
    "GraphRepresentation", "json json_dict json_string json_module yaml test_core"
)

network_x_version = Version(networkx.__version__)


[docs] def dump( graph: networkx.DiGraph, destination: Optional[Union[str, Path]] = None, representation: Optional[Union[GraphRepresentation, str]] = None, **save_options, ) -> Union[str, Path, dict]: """From runtime to persistent representation""" if isinstance(representation, str): representation = GraphRepresentation.__members__[representation] if representation is None: if isinstance(destination, (str, Path)): filename = str(destination).lower() if filename.endswith(".json"): representation = GraphRepresentation.json elif filename.endswith((".yml", ".yaml")): representation = GraphRepresentation.yaml else: representation = GraphRepresentation.json_dict if representation == GraphRepresentation.json_dict: return _networkx_to_dict(graph) if representation == GraphRepresentation.json: if destination is None: raise TypeError("Destination should be specified when dumping to json") dictrepr = dump(graph) makedirs_from_filename(destination) save_options.setdefault("indent", 2) with open(destination, mode="w") as f: json.dump(dictrepr, f, **save_options) return destination if representation == GraphRepresentation.json_string: dictrepr = dump(graph) return json.dumps(dictrepr, **save_options) if representation == GraphRepresentation.yaml: if destination is None: raise TypeError("Destination should be specified when dumping to yaml") dictrepr = dump(graph) makedirs_from_filename(destination) with open(destination, mode="w") as f: yaml.dump(dictrepr, f, **save_options) return destination if representation == GraphRepresentation.json_module: if destination is None: raise TypeError("Destination should be specified when dumping to json") package, _, file = str(destination).rpartition(".") assert package, f"No package provided when saving graph to '{destination}'" destination = os.path.join(_package_path(package), f"{file}.json") return dump( graph, destination=destination, representation="json", **save_options ) if representation == GraphRepresentation.test_core: raise TypeError("'test_core' workflows representations cannot be saved") raise TypeError(representation, type(representation))
[docs] def load( source: Optional[GraphSource] = None, representation: Optional[Union[GraphRepresentation, str]] = None, root_dir: Optional[Union[str, Path]] = None, root_module: Optional[str] = None, ) -> networkx.DiGraph: """From persistent to runtime representation""" if isinstance(representation, str): representation = GraphRepresentation.__members__[representation] if representation is None: if isinstance(source, Mapping): representation = GraphRepresentation.json_dict elif isinstance(source, (str, Path)): if isinstance(source, str) and "{" in source and "}" in source: representation = GraphRepresentation.json_string else: filename = str(source).lower() if filename.endswith(".json"): representation = GraphRepresentation.json elif filename.endswith((".yml", ".yaml")): representation = GraphRepresentation.yaml else: source = _read_any_file( source, root_dir=root_dir, root_module=root_module ) representation = GraphRepresentation.json_dict if not source: graph = networkx.DiGraph() elif isinstance(source, networkx.Graph): graph = source elif hasattr(source, "graph") and isinstance(source.graph, networkx.Graph): graph = source.graph elif representation == GraphRepresentation.json_dict: graph = _dict_to_networkx(source) elif representation == GraphRepresentation.json: graph_dict = _read_json_file(source, root_dir=root_dir, root_module=root_module) graph = _dict_to_networkx(graph_dict) elif representation == GraphRepresentation.json_string: graph_dict = json_load(source) graph = _dict_to_networkx(graph_dict) elif representation == GraphRepresentation.yaml: graph_dict = _read_yaml_file(source, root_dir=root_dir, root_module=root_module) graph = _dict_to_networkx(graph_dict) elif representation == GraphRepresentation.json_module: package, _, source = source.rpartition(".") if package: source = os.path.join(_package_path(package), source) return load( source, representation="json", root_dir=root_dir, root_module=root_module, ) elif representation == GraphRepresentation.test_core: from ..tests.examples.graphs import get_graph return load(get_graph(source)[0], representation="json_dict") else: raise TypeError(representation, type(representation)) if not networkx.is_directed(graph): raise TypeError(graph, type(graph)) return graph
def _read_json_file( filename: Union[str, Path], root_dir: Optional[Union[str, Path]] = None, root_module: Optional[str] = None, ) -> dict: filename = _find_graph_path( filename, root_dir=root_dir, root_module=root_module, possible_extensions=(".json",), ) with open(filename, mode="r") as f: return json_load(f) def _read_yaml_file( filename: str, root_dir: Optional[str] = None, root_module: Optional[str] = None ) -> dict: filename = _find_graph_path( filename, root_dir=root_dir, root_module=root_module, possible_extensions=(".yml", ".yaml"), ) with open(filename, mode="r") as f: return _yaml_load(f) def _read_any_file( filename: Union[str, Path], root_dir: Optional[Union[str, Path]] = None, root_module: Optional[str] = None, ) -> Optional[dict]: filename = _find_graph_path( filename, root_dir=root_dir, root_module=root_module, possible_extensions=(".json", ".yml", ".yaml"), ) with open(filename, mode="r") as f: content = f.read() try: return json_load(content) except (json.JSONDecodeError, TypeError): pass try: return _yaml_load(content) except (yaml.YAMLError, TypeError): pass raise ValueError(f"File format of '{filename}' not supported")
[docs] def json_load(content) -> dict: if isinstance(content, str): result = json.loads(content, object_pairs_hook=_ewoks_jsonload_hook) else: result = json.load(content, object_pairs_hook=_ewoks_jsonload_hook) if not isinstance(result, Mapping): raise TypeError("graph must be a dictionary") return result
def _yaml_load(content) -> dict: result = yaml.load(content, yaml.Loader) if not isinstance(result, Mapping): raise TypeError("graph must be a dictionary") return result def _ewoks_jsonload_hook_pair(item): key, value = item if key in ( "source", "target", "sub_source", "sub_target", "id", "node", "sub_node", ): value = node_id_from_json(value) return key, value
[docs] @deprecated("Use 'json_load' instead") def ewoks_jsonload_hook(items): return _ewoks_jsonload_hook(items)
def _ewoks_jsonload_hook(items): return dict(map(_ewoks_jsonload_hook_pair, items)) def _find_graph_path( path: Union[str, Path], root_dir: Optional[Union[str, Path]] = None, root_module: Optional[str] = None, possible_extensions: Tuple[str] = tuple(), ) -> str: """When the :code:`path` is relative, the parent directory is assumed to be (in order of priority): * :code:`root_dir` * :code:`root_module` directory * current working directory When :code:`path` is not found it tries to find the path with a different extension from :code:`possible_extensions`. :param path: could be a relative path, might have no extension :param root_dir: in case :code:`path` is relative :param root_module: in case :code:`root_module` is not provided :param possible_extensions: in case :code:`path` is not found :raises: FileNotFoundError """ # Absolute path if not root_dir and root_module: root_dir = _package_path(root_module) if not os.path.isabs(path) and root_dir: path = os.path.join(root_dir, path) path = os.path.abspath(path) if os.path.exists(path): return path # Try different extensions root, _ = os.path.splitext(path) for new_ext in possible_extensions: new_full_path = root + new_ext if os.path.exists(new_full_path): return new_full_path raise FileNotFoundError(path) def _package_path(package: str) -> str: package = importlib.import_module(package) return package.__path__[0] def _dict_to_networkx(graph: dict) -> networkx.DiGraph: graph.setdefault("directed", True) graph.setdefault("nodes", list()) graph.setdefault("links", list()) graph.setdefault("graph", dict()) if "id" not in graph["graph"]: logger.warning('Graph has no "id": use "notspecified"') graph["graph"]["id"] = "notspecified" normalize_schema_version(graph) if network_x_version < Version("3.4rc"): return networkx.readwrite.json_graph.node_link_graph(graph) else: return networkx.readwrite.json_graph.node_link_graph(graph, edges="links") def _networkx_to_dict(graph: networkx.DiGraph) -> dict: if network_x_version < Version("3.4rc"): graph_dict = networkx.readwrite.json_graph.node_link_data(graph) else: graph_dict = networkx.readwrite.json_graph.node_link_data(graph, edges="links") # Remove fields that are not part of the Ewoks spec graph_dict.pop("directed") graph_dict.pop("multigraph") return graph_dict