Run the four-stage pipeline with atomic-write staging. Writes to ``<db_path>.staging`` (seeded from the live DB so prior repos are preserved) and atomically renames over ``<db_path>`` on success. Holds an exclusive flock on ``<db>.indexlock`` so two concurrent indexes can't race the
(
*,
source_path: Path,
repo_id: str,
db_path: str,
batch_size: int,
verbose: bool,
extra_metadata: dict[str, object] | None = None,
on_event: "Callable[[object], None] | None" = None,
)
| 354 | |
| 355 | |
| 356 | def _run_indexing_pipeline( |
| 357 | *, |
| 358 | source_path: Path, |
| 359 | repo_id: str, |
| 360 | db_path: str, |
| 361 | batch_size: int, |
| 362 | verbose: bool, |
| 363 | extra_metadata: dict[str, object] | None = None, |
| 364 | on_event: "Callable[[object], None] | None" = None, |
| 365 | ) -> float: |
| 366 | """Run the four-stage pipeline with atomic-write staging. |
| 367 | |
| 368 | Writes to ``<db_path>.staging`` (seeded from the live DB so prior repos are |
| 369 | preserved) and atomically renames over ``<db_path>`` on success. Holds an |
| 370 | exclusive flock on ``<db>.indexlock`` so two concurrent indexes can't race |
| 371 | the swap. *extra_metadata* is merged on top of the auto-collected metadata |
| 372 | before persistence. Returns elapsed seconds. |
| 373 | |
| 374 | ``on_event`` (optional): invoked for every PipelineEvent yielded by the |
| 375 | inner pipeline. Used by the serve.py /api/index-url worker to publish |
| 376 | live progress (phase / current / total / nodes / edges) so the UI's |
| 377 | polling endpoint can report meaningful numbers instead of zeros. |
| 378 | Exceptions raised in the callback are swallowed — observability must |
| 379 | never crash the pipeline. |
| 380 | """ |
| 381 | from opentrace_agent.pipeline import PipelineInput, run_pipeline |
| 382 | from opentrace_agent.pipeline.adapters import GraphStoreAdapter |
| 383 | from opentrace_agent.store import GraphStore |
| 384 | |
| 385 | db_dir = Path(db_path).parent |
| 386 | db_dir.mkdir(parents=True, exist_ok=True) |
| 387 | _ensure_gitignore(db_dir) |
| 388 | |
| 389 | # Staging file avoids contending with readers (MCP) holding the live DB lock. |
| 390 | staging_db = db_path + ".staging" |
| 391 | |
| 392 | lock_fh = _acquire_index_lock(db_path) |
| 393 | try: |
| 394 | _clean_stale_staging(staging_db) |
| 395 | |
| 396 | click.echo(f"Opening staging database at {staging_db} ...") |
| 397 | try: |
| 398 | _seed_staging_from_live(db_path, staging_db) |
| 399 | |
| 400 | with GraphStore(staging_db) as graph_store: |
| 401 | store = GraphStoreAdapter(graph_store, batch_size=batch_size) |
| 402 | |
| 403 | click.echo(f"Indexing {source_path} ...") |
| 404 | t0 = time.monotonic() |
| 405 | |
| 406 | inp = PipelineInput(path=str(source_path), repo_id=repo_id) |
| 407 | |
| 408 | last_result = None |
| 409 | for event in run_pipeline(inp, store=store): |
| 410 | _print_event(event, verbose) |
| 411 | if getattr(event, "result", None) is not None: |
| 412 | last_result = event.result |
| 413 | if on_event is not None: |