MCPcopy
hub / github.com/grafana/tempo / consumeAndCombineResponses

Function consumeAndCombineResponses

modules/frontend/pipeline/collector_http.go:67–134  ·  view source on GitHub ↗
(ctx context.Context, consumers int, resps Responses[combiner.PipelineResponse], c combiner.Combiner, callback func() error)

Source from the content-addressed store, hash-verified

65}
66
67func consumeAndCombineResponses(ctx context.Context, consumers int, resps Responses[combiner.PipelineResponse], c combiner.Combiner, callback func() error) error {
68 respChan := make(chan combiner.PipelineResponse)
69 overallErr := atomic.Error{}
70 wg := sync.WaitGroup{}
71
72 setErr := func(err error) {
73 overallErr.CompareAndSwap(nil, err)
74 }
75
76 if consumers <= 0 {
77 consumers = 10
78 }
79
80 for i := 0; i < consumers; i++ {
81 wg.Add(1)
82 go func() {
83 defer wg.Done()
84 for resp := range respChan {
85 err := c.AddResponse(resp)
86 if err != nil {
87 setErr(err)
88 }
89 }
90 }()
91 }
92
93 for {
94 if ctx.Err() != nil {
95 setErr(ctx.Err())
96 break
97 }
98
99 resp, done, err := resps.Next(ctx)
100 if err != nil {
101 setErr(err)
102 break
103 }
104
105 if resp != nil {
106 respChan <- resp
107 }
108
109 if overallErr.Load() != nil {
110 break
111 }
112
113 if c.ShouldQuit() {
114 break
115 }
116
117 if done {
118 break
119 }
120
121 if callback != nil {
122 err = callback()
123 if err != nil {
124 setErr(err)

Callers 2

RoundTripMethod · 0.85
RoundTripMethod · 0.85

Calls 7

AddMethod · 0.65
DoneMethod · 0.65
AddResponseMethod · 0.65
NextMethod · 0.65
ShouldQuitMethod · 0.65
WaitMethod · 0.65
callbackFunction · 0.50

Tested by

no test coverage detected