MCPcopy
hub / github.com/grafana/tempo / NewAsyncSharderChan

Function NewAsyncSharderChan

modules/frontend/pipeline/async_sharding.go:65–105  ·  view source on GitHub ↗

NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.

(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse])

Source from the content-addressed store, hash-verified

63
64// NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.
65func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
66 if concurrentReqs == 0 {
67 panic("NewAsyncSharderChan: concurrentReqs must be greater than 0")
68 }
69
70 wg := &sync.WaitGroup{}
71 asyncResp := newAsyncResponse()
72
73 for i := 0; i < concurrentReqs; i++ {
74 wg.Add(1)
75 go func() {
76 defer wg.Done()
77 for req := range reqs {
78 if err := req.Context().Err(); err != nil {
79 asyncResp.SendError(err)
80 continue
81 }
82
83 resp, err := next.RoundTrip(req)
84 if err != nil {
85 asyncResp.SendError(err)
86 continue
87 }
88
89 asyncResp.Send(ctx, resp)
90 }
91 }()
92 }
93
94 go func() {
95 // send any responses back the caller would like to send
96 if resps != nil {
97 asyncResp.Send(ctx, resps)
98 }
99
100 wg.Wait()
101 asyncResp.SendComplete()
102 }()
103
104 return asyncResp
105}

Callers 5

RoundTripMethod · 0.92
RoundTripMethod · 0.92
RoundTripMethod · 0.92
RoundTripMethod · 0.85
TestAsyncShardersFunction · 0.85

Calls 9

newAsyncResponseFunction · 0.85
SendErrorMethod · 0.80
SendCompleteMethod · 0.80
AddMethod · 0.65
DoneMethod · 0.65
ContextMethod · 0.65
RoundTripMethod · 0.65
SendMethod · 0.65
WaitMethod · 0.65

Tested by 2

RoundTripMethod · 0.68
TestAsyncShardersFunction · 0.68