Load a subgraph definition from disk. Returns a tuple of (graph_dict, resolved_path).
(file_path: str, *, parent_source: str | None = None)
| 61 | |
| 62 | |
| 63 | def load_subgraph_config(file_path: str, *, parent_source: str | None = None) -> Tuple[Dict[str, Any], Dict[str, Any], str]: |
| 64 | """Load a subgraph definition from disk. |
| 65 | |
| 66 | Returns a tuple of (graph_dict, resolved_path). |
| 67 | """ |
| 68 | |
| 69 | candidates = _resolve_candidate_paths(file_path, parent_source) |
| 70 | resolved_path = _resolve_existing_path(candidates).resolve() |
| 71 | |
| 72 | if resolved_path not in _SUBGRAPH_CACHE: |
| 73 | _SUBGRAPH_CACHE[resolved_path] = _load_graph_dict(resolved_path) |
| 74 | |
| 75 | payload = _SUBGRAPH_CACHE[resolved_path] |
| 76 | graph_dict = deepcopy(payload["graph"]) |
| 77 | vars_dict = dict(payload["vars"]) |
| 78 | return graph_dict, vars_dict, str(resolved_path) |
| 79 | |
| 80 | |
| 81 | __all__ = ["load_subgraph_config"] |
no test coverage detected