Filesystem-backed attachment manifest for a workflow execution.
| 57 | |
| 58 | |
| 59 | class AttachmentStore: |
| 60 | """Filesystem-backed attachment manifest for a workflow execution.""" |
| 61 | |
| 62 | def __init__(self, root_dir: Path | str, inline_size_limit: int = DEFAULT_INLINE_LIMIT) -> None: |
| 63 | self.root = Path(root_dir) |
| 64 | self.inline_size_limit = inline_size_limit |
| 65 | self.root.mkdir(parents=True, exist_ok=True) |
| 66 | self.manifest_path = self.root / "attachments_manifest.json" |
| 67 | self._records: Dict[str, AttachmentRecord] = {} |
| 68 | self._persistent_ids: set[str] = set() |
| 69 | self._hash_index: Dict[str, str] = {} |
| 70 | self._load_manifest() |
| 71 | |
| 72 | def register_file( |
| 73 | self, |
| 74 | file_path: Path | str, |
| 75 | *, |
| 76 | kind: MessageBlockType = MessageBlockType.FILE, |
| 77 | display_name: Optional[str] = None, |
| 78 | mime_type: Optional[str] = None, |
| 79 | attachment_id: Optional[str] = None, |
| 80 | copy_file: bool = True, |
| 81 | description: Optional[str] = None, |
| 82 | extra: Optional[Dict[str, Any]] = None, |
| 83 | persist: bool = True, |
| 84 | deduplicate: bool = False, |
| 85 | ) -> AttachmentRecord: |
| 86 | """Register a local file and return its attachment record.""" |
| 87 | source = Path(file_path) |
| 88 | if not source.exists(): |
| 89 | raise FileNotFoundError(f"Attachment source not found: {source}") |
| 90 | |
| 91 | guessed_mime = mime_type or (mimetypes.guess_type(source.name)[0] or "application/octet-stream") |
| 92 | attachment_id = attachment_id or uuid.uuid4().hex |
| 93 | sha256_source = _sha256_file(source) |
| 94 | |
| 95 | if deduplicate: |
| 96 | existing = self._find_duplicate_by_hash( |
| 97 | sha256_source, |
| 98 | copy_file=copy_file, |
| 99 | source_path=source, |
| 100 | ) |
| 101 | if existing: |
| 102 | return existing |
| 103 | if copy_file: |
| 104 | target_dir = self.root / attachment_id |
| 105 | target_dir.mkdir(parents=True, exist_ok=True) |
| 106 | target_path = target_dir / source.name |
| 107 | shutil.copy2(source, target_path) |
| 108 | else: |
| 109 | target_path = source.resolve() |
| 110 | |
| 111 | size = target_path.stat().st_size |
| 112 | sha256 = sha256_source or _sha256_file(target_path) |
| 113 | data_uri = None |
| 114 | # if size <= self.inline_size_limit: |
| 115 | # data_uri = encode_file_to_data_uri(target_path, guessed_mime) |
| 116 |
no outgoing calls
no test coverage detected