MCPcopy
hub / github.com/grafana/tempo / NewAsyncSharderFunc

Function NewAsyncSharderFunc

modules/frontend/pipeline/async_sharding.go:19–62  ·  modules/frontend/pipeline/async_sharding.go::NewAsyncSharderFunc

NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one goroutine per concurrent request.

(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse])

Source from the content-addressed store, hash-verified

17// NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one
18// goroutine per concurrent request.
19func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
20 var wg waitGroup
21 if concurrentReqs <= 0 {
22 wg = &sync.WaitGroup{}
23 } else {
24 bwg := boundedwaitgroup.New(uint(concurrentReqs))
25 wg = &bwg
26 }
27 asyncResp := newAsyncResponse()
28
29 go func() {
30 defer asyncResp.SendComplete()
31
32 for i := 0; i < totalReqs; i++ {
33 req := reqFn(i)
34 // else check for a request to pass down the pipeline
35 if req == nil {
36 continue
37 }
38
39 if err := req.Context().Err(); err != nil {
40 asyncResp.SendError(err)
41 continue
42 }
43
44 wg.Add(1)
45 go func(r Request) {
46 defer wg.Done()
47
48 resp, err := next.RoundTrip(r)
49 if err != nil {
50 asyncResp.SendError(err)
51 return
52 }
53
54 asyncResp.Send(ctx, resp)
55 }(req)
56 }
57
58 wg.Wait()
59 }()
60
61 return asyncResp
62}
63
64// NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.
65func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {

Callers 4

RoundTripMethod · 0.92
RoundTripMethod · 0.85
RoundTripMethod · 0.85
TestAsyncShardersFunction · 0.85

Calls 10

AddMethod · 0.95
DoneMethod · 0.95
WaitMethod · 0.95
NewFunction · 0.92
newAsyncResponseFunction · 0.85
SendCompleteMethod · 0.80
SendErrorMethod · 0.80
ContextMethod · 0.65
RoundTripMethod · 0.65
SendMethod · 0.65

Tested by 2

RoundTripMethod · 0.68
TestAsyncShardersFunction · 0.68