MCPcopy
hub / github.com/opentrace/opentrace / core_pipeline

Function core_pipeline

agent/src/opentrace_agent/pipeline/pipeline.py:41–64  ·  view source on GitHub ↗

Run scanning → processing → resolving, yielding events throughout.

(
    inp: PipelineInput,
    ctx: PipelineContext,
)

Source from the content-addressed store, hash-verified

39
40
41def core_pipeline(
42 inp: PipelineInput,
43 ctx: PipelineContext,
44) -> Generator[PipelineEvent, None, None]:
45 """Run scanning → processing → resolving, yielding events throughout."""
46 scan_out: StageResult[ScanResult] = StageResult()
47 yield from scanning(inp, ctx, scan_out)
48 if ctx.cancelled or scan_out.value is None:
49 return
50
51 proc_out: StageResult[ProcessingOutput] = StageResult()
52 yield from processing(scan_out.value, ctx, proc_out)
53 if ctx.cancelled or proc_out.value is None:
54 return
55
56 result_out: StageResult[PipelineResult] = StageResult()
57 yield from resolving(proc_out.value, ctx, result_out)
58
59 yield PipelineEvent(
60 kind=EventKind.DONE,
61 phase=Phase.RESOLVING,
62 message="Pipeline complete",
63 result=result_out.value,
64 )
65
66
67def run_pipeline(

Callers 1

run_pipelineFunction · 0.85

Calls 5

StageResultClass · 0.90
scanningFunction · 0.90
processingFunction · 0.90
resolvingFunction · 0.90
PipelineEventClass · 0.90

Tested by

no test coverage detected