NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.
(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse])
| 63 | |
| 64 | // NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines. |
| 65 | func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { |
| 66 | if concurrentReqs == 0 { |
| 67 | panic("NewAsyncSharderChan: concurrentReqs must be greater than 0") |
| 68 | } |
| 69 | |
| 70 | wg := &sync.WaitGroup{} |
| 71 | asyncResp := newAsyncResponse() |
| 72 | |
| 73 | for i := 0; i < concurrentReqs; i++ { |
| 74 | wg.Add(1) |
| 75 | go func() { |
| 76 | defer wg.Done() |
| 77 | for req := range reqs { |
| 78 | if err := req.Context().Err(); err != nil { |
| 79 | asyncResp.SendError(err) |
| 80 | continue |
| 81 | } |
| 82 | |
| 83 | resp, err := next.RoundTrip(req) |
| 84 | if err != nil { |
| 85 | asyncResp.SendError(err) |
| 86 | continue |
| 87 | } |
| 88 | |
| 89 | asyncResp.Send(ctx, resp) |
| 90 | } |
| 91 | }() |
| 92 | } |
| 93 | |
| 94 | go func() { |
| 95 | // send any responses back the caller would like to send |
| 96 | if resps != nil { |
| 97 | asyncResp.Send(ctx, resps) |
| 98 | } |
| 99 | |
| 100 | wg.Wait() |
| 101 | asyncResp.SendComplete() |
| 102 | }() |
| 103 | |
| 104 | return asyncResp |
| 105 | } |