Source code for ewokscore.workflow_discovery

import sys
from fnmatch import fnmatch
from typing import Generator
from typing import List
from typing import Optional

if sys.version_info < (3, 9):
    import importlib_resources
else:
    import importlib.resources as importlib_resources


from .entry_points import entry_points


[docs] def discover_all_workflows( workflow_extension: Optional[str] = None, raise_import_failure: bool = True, ) -> List[str]: """ Generates the full python qualifier names of the workflows as defined by entry-point group "ewoks.workflows". """ return list( _iter_discover_all_workflows( workflow_extension=workflow_extension, raise_import_failure=raise_import_failure, ) )
[docs] def discover_workflows_from_package_data( *fq_names_or_patterns: str, workflow_extension: Optional[str] = None, raise_import_failure: bool = True, ) -> List[str]: """ Generates the full python qualifier names of the workflows as defined by the module names/patterns. """ if workflow_extension is None: workflow_extensions = ("json", "yaml") else: workflow_extensions = (workflow_extension,) result = list() for fq_name_or_pattern in fq_names_or_patterns: for workflow_extension in workflow_extensions: result.extend( _iter_fqns_from_pattern( fq_name_or_pattern, file_extension=workflow_extension, raise_import_failure=raise_import_failure, ) ) return result
def _iter_discover_all_workflows( workflow_extension: Optional[str] = None, raise_import_failure: bool = True, ) -> Generator[str, None, None]: visited = set() for entrypoint in entry_points("ewoks.workflows"): module_pattern = entrypoint.name if module_pattern in visited: continue visited.add(module_pattern) yield from discover_workflows_from_package_data( module_pattern, workflow_extension=workflow_extension, raise_import_failure=raise_import_failure, ) def _iter_fqns_from_pattern( fq_name_or_pattern: str, file_extension: str, raise_import_failure: bool = True, ) -> Generator[str, None, None]: if not fq_name_or_pattern: return parts = fq_name_or_pattern.split(".") first = parts[0] rest = parts[1:] yield from _iter_module_pattern_parts( current_package=first, remaining_parts=rest, file_extension=file_extension, raise_import_failure=raise_import_failure, ) def _iter_module_pattern_parts( current_package: str, remaining_parts: List[str], file_extension: str, raise_import_failure: bool = True, ) -> Generator[str, None, None]: if not remaining_parts: return part = remaining_parts[0] tail = remaining_parts[1:] try: files = importlib_resources.files(current_package) except Exception: if raise_import_failure: raise return # Last component: match workflow resource files if not tail: pattern = f"{part}.{file_extension}" for resource in files.iterdir(): if not resource.is_file(): continue if fnmatch(resource.name, pattern): stem = resource.name[: -(len(file_extension) + 1)] yield f"{current_package}.{stem}" return # Intermediate package matching for resource in files.iterdir(): if not resource.is_dir(): continue if not fnmatch(resource.name, part): continue subpackage = f"{current_package}.{resource.name}" # Ensure it is importable as a package try: importlib_resources.files(subpackage) except Exception: if raise_import_failure: raise continue yield from _iter_module_pattern_parts( current_package=subpackage, remaining_parts=tail, file_extension=file_extension, raise_import_failure=raise_import_failure, )