MCPcopy
hub / github.com/opentrace/opentrace / _run_indexing_pipeline

Function _run_indexing_pipeline

agent/src/opentrace_agent/cli/main.py:356–438  ·  view source on GitHub ↗

Run the four-stage pipeline with atomic-write staging. Writes to ``<db_path>.staging`` (seeded from the live DB so prior repos are preserved) and atomically renames over ``<db_path>`` on success. Holds an exclusive flock on ``<db>.indexlock`` so two concurrent indexes can't race the

(
    *,
    source_path: Path,
    repo_id: str,
    db_path: str,
    batch_size: int,
    verbose: bool,
    extra_metadata: dict[str, object] | None = None,
    on_event: "Callable[[object], None] | None" = None,
)

Source from the content-addressed store, hash-verified

354
355
356def _run_indexing_pipeline(
357 *,
358 source_path: Path,
359 repo_id: str,
360 db_path: str,
361 batch_size: int,
362 verbose: bool,
363 extra_metadata: dict[str, object] | None = None,
364 on_event: "Callable[[object], None] | None" = None,
365) -> float:
366 """Run the four-stage pipeline with atomic-write staging.
367
368 Writes to ``<db_path>.staging`` (seeded from the live DB so prior repos are
369 preserved) and atomically renames over ``<db_path>`` on success. Holds an
370 exclusive flock on ``<db>.indexlock`` so two concurrent indexes can&#x27;t race
371 the swap. *extra_metadata* is merged on top of the auto-collected metadata
372 before persistence. Returns elapsed seconds.
373
374 ``on_event`` (optional): invoked for every PipelineEvent yielded by the
375 inner pipeline. Used by the serve.py /api/index-url worker to publish
376 live progress (phase / current / total / nodes / edges) so the UI&#x27;s
377 polling endpoint can report meaningful numbers instead of zeros.
378 Exceptions raised in the callback are swallowed — observability must
379 never crash the pipeline.
380 """
381 from opentrace_agent.pipeline import PipelineInput, run_pipeline
382 from opentrace_agent.pipeline.adapters import GraphStoreAdapter
383 from opentrace_agent.store import GraphStore
384
385 db_dir = Path(db_path).parent
386 db_dir.mkdir(parents=True, exist_ok=True)
387 _ensure_gitignore(db_dir)
388
389 # Staging file avoids contending with readers (MCP) holding the live DB lock.
390 staging_db = db_path + ".staging"
391
392 lock_fh = _acquire_index_lock(db_path)
393 try:
394 _clean_stale_staging(staging_db)
395
396 click.echo(f"Opening staging database at {staging_db} ...")
397 try:
398 _seed_staging_from_live(db_path, staging_db)
399
400 with GraphStore(staging_db) as graph_store:
401 store = GraphStoreAdapter(graph_store, batch_size=batch_size)
402
403 click.echo(f"Indexing {source_path} ...")
404 t0 = time.monotonic()
405
406 inp = PipelineInput(path=str(source_path), repo_id=repo_id)
407
408 last_result = None
409 for event in run_pipeline(inp, store=store):
410 _print_event(event, verbose)
411 if getattr(event, "result", None) is not None:
412 last_result = event.result
413 if on_event is not None:

Calls 15

flushMethod · 0.95
GraphStoreClass · 0.90
GraphStoreAdapterClass · 0.90
PipelineInputClass · 0.90
run_pipelineFunction · 0.90
_ensure_gitignoreFunction · 0.85
_acquire_index_lockFunction · 0.85
_clean_stale_stagingFunction · 0.85
_seed_staging_from_liveFunction · 0.85
_print_eventFunction · 0.85
getattrFunction · 0.85
_collect_metadataFunction · 0.85