NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one goroutine per concurrent request.
(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse])
| 17 | // NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one |
| 18 | // goroutine per concurrent request. |
| 19 | func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { |
| 20 | var wg waitGroup |
| 21 | if concurrentReqs <= 0 { |
| 22 | wg = &sync.WaitGroup{} |
| 23 | } else { |
| 24 | bwg := boundedwaitgroup.New(uint(concurrentReqs)) |
| 25 | wg = &bwg |
| 26 | } |
| 27 | asyncResp := newAsyncResponse() |
| 28 | |
| 29 | go func() { |
| 30 | defer asyncResp.SendComplete() |
| 31 | |
| 32 | for i := 0; i < totalReqs; i++ { |
| 33 | req := reqFn(i) |
| 34 | // else check for a request to pass down the pipeline |
| 35 | if req == nil { |
| 36 | continue |
| 37 | } |
| 38 | |
| 39 | if err := req.Context().Err(); err != nil { |
| 40 | asyncResp.SendError(err) |
| 41 | continue |
| 42 | } |
| 43 | |
| 44 | wg.Add(1) |
| 45 | go func(r Request) { |
| 46 | defer wg.Done() |
| 47 | |
| 48 | resp, err := next.RoundTrip(r) |
| 49 | if err != nil { |
| 50 | asyncResp.SendError(err) |
| 51 | return |
| 52 | } |
| 53 | |
| 54 | asyncResp.Send(ctx, resp) |
| 55 | }(req) |
| 56 | } |
| 57 | |
| 58 | wg.Wait() |
| 59 | }() |
| 60 | |
| 61 | return asyncResp |
| 62 | } |
| 63 | |
| 64 | // NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines. |
| 65 | func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { |