(
self,
node: Node,
workspace: Path,
*,
success: bool,
)
| 87 | self._snapshots[node.id] = snapshot |
| 88 | |
| 89 | def after_node( |
| 90 | self, |
| 91 | node: Node, |
| 92 | workspace: Path, |
| 93 | *, |
| 94 | success: bool, |
| 95 | ) -> None: |
| 96 | if not success or not self.can_handle(node): |
| 97 | self._snapshots.pop(node.id, None) |
| 98 | return |
| 99 | |
| 100 | before = self._snapshots.pop(node.id, {}) |
| 101 | after, truncated = self._snapshot(workspace) |
| 102 | if not after and not self._last_emitted: |
| 103 | return |
| 104 | |
| 105 | changed_paths = [ |
| 106 | Path(path_str) |
| 107 | for path_str, signature in after.items() |
| 108 | if path_str not in before or before[path_str].sha256 != signature.sha256 |
| 109 | ] |
| 110 | |
| 111 | artifacts: List[WorkspaceArtifact] = [] |
| 112 | for relative_path in changed_paths: |
| 113 | signature = after[str(relative_path)] |
| 114 | full_path = workspace / relative_path |
| 115 | if not full_path.exists() or not full_path.is_file(): |
| 116 | continue |
| 117 | try: |
| 118 | tracked = self._last_emitted.get(str(relative_path)) |
| 119 | change_type = "created" if tracked is None else "updated" |
| 120 | record = self._register_artifact( |
| 121 | full_path, |
| 122 | relative_path, |
| 123 | node, |
| 124 | attachment_id=tracked.attachment_id if tracked else None, |
| 125 | ) |
| 126 | except Exception as exc: |
| 127 | self.logger.warning( |
| 128 | "Failed to register artifact %s for node %s: %s", |
| 129 | relative_path, |
| 130 | node.id, |
| 131 | exc, |
| 132 | ) |
| 133 | continue |
| 134 | artifacts.append( |
| 135 | self._to_artifact( |
| 136 | record, |
| 137 | node, |
| 138 | relative_path, |
| 139 | full_path, |
| 140 | change_type=change_type, |
| 141 | ) |
| 142 | ) |
| 143 | self._last_emitted[str(relative_path)] = _TrackedEntry( |
| 144 | sha256=signature.sha256, |
| 145 | attachment_id=record.ref.attachment_id or "", |
| 146 | absolute_path=str(full_path), |
no test coverage detected