MCPcopy
hub / github.com/opentrace/opentrace / collect_pipeline

Function collect_pipeline

agent/src/opentrace_agent/pipeline/pipeline.py:81–98  ·  view source on GitHub ↗

Convenience for tests — consume all events, return aggregated results.

(
    inp: PipelineInput,
    ctx: PipelineContext | None = None,
    store: Store | None = None,
)

Source from the content-addressed store, hash-verified

79
80
81def collect_pipeline(
82 inp: PipelineInput,
83 ctx: PipelineContext | None = None,
84 store: Store | None = None,
85) -> tuple[list[PipelineEvent], list[GraphNode], list[GraphRelationship]]:
86 """Convenience for tests — consume all events, return aggregated results."""
87 events: list[PipelineEvent] = []
88 all_nodes: list[GraphNode] = []
89 all_rels: list[GraphRelationship] = []
90
91 for event in run_pipeline(inp, ctx, store):
92 events.append(event)
93 if event.nodes:
94 all_nodes.extend(event.nodes)
95 if event.relationships:
96 all_rels.extend(event.relationships)
97
98 return events, all_nodes, all_rels

Calls 1

run_pipelineFunction · 0.85