Walk directory tree and produce structural GraphNode/GraphRelationship objects.
(
inp: PipelineInput,
ctx: PipelineContext,
out: StageResult[ScanResult],
)
| 48 | |
| 49 | |
| 50 | def scanning( |
| 51 | inp: PipelineInput, |
| 52 | ctx: PipelineContext, |
| 53 | out: StageResult[ScanResult], |
| 54 | ) -> Generator[PipelineEvent, None, None]: |
| 55 | """Walk directory tree and produce structural GraphNode/GraphRelationship objects.""" |
| 56 | yield PipelineEvent( |
| 57 | kind=EventKind.STAGE_START, |
| 58 | phase=Phase.SCANNING, |
| 59 | message="Scanning directory tree", |
| 60 | ) |
| 61 | |
| 62 | root_path = Path(inp.path) if inp.path else None |
| 63 | if root_path is None: |
| 64 | yield PipelineEvent( |
| 65 | kind=EventKind.ERROR, |
| 66 | phase=Phase.SCANNING, |
| 67 | message="No path provided", |
| 68 | errors=["PipelineInput.path is required for local scanning"], |
| 69 | ) |
| 70 | return |
| 71 | |
| 72 | root_path = root_path.resolve() |
| 73 | repo_id = inp.repo_id or root_path.name |
| 74 | repo_url = inp.repo_url |
| 75 | ref = inp.ref |
| 76 | provider = inp.provider |
| 77 | |
| 78 | if ctx.cancelled: |
| 79 | return |
| 80 | |
| 81 | # Walk directory tree → flat nodes/rels |
| 82 | walker = DirectoryWalker() |
| 83 | walk = walker.walk( |
| 84 | root_path=root_path, |
| 85 | repo_id=repo_id, |
| 86 | repo_name=root_path.name, |
| 87 | ) |
| 88 | |
| 89 | if ctx.cancelled: |
| 90 | return |
| 91 | |
| 92 | nodes = walk.nodes |
| 93 | rels = walk.relationships |
| 94 | |
| 95 | # Enrich Repository node with ref/sourceUri/provider |
| 96 | for node in nodes: |
| 97 | if node.type == "Repository": |
| 98 | props = dict(node.properties or {}) |
| 99 | props["summary"] = f"Source code repository for {node.name}" |
| 100 | if ref: |
| 101 | props["ref"] = ref |
| 102 | if repo_url: |
| 103 | props["sourceUri"] = repo_url |
| 104 | if provider: |
| 105 | props["provider"] = provider |
| 106 | nodes[nodes.index(node)] = GraphNode( |
| 107 | id=node.id, |