ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. In case userFunc returns error, it will continue to process remaining users but returns an error with all errors userFunc has returned.
(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error)
| 14 | // In case userFunc returns error, it will continue to process remaining users but returns an |
| 15 | // error with all errors userFunc has returned. |
| 16 | func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error { |
| 17 | if len(userIDs) == 0 { |
| 18 | return nil |
| 19 | } |
| 20 | |
| 21 | // Push all jobs to a channel. |
| 22 | ch := make(chan string, len(userIDs)) |
| 23 | for _, userID := range userIDs { |
| 24 | ch <- userID |
| 25 | } |
| 26 | close(ch) |
| 27 | |
| 28 | // Keep track of all errors occurred. |
| 29 | errs := multierror.MultiError{} |
| 30 | errsMx := sync.Mutex{} |
| 31 | |
| 32 | wg := sync.WaitGroup{} |
| 33 | for ix := 0; ix < min(concurrency, len(userIDs)); ix++ { |
| 34 | wg.Add(1) |
| 35 | go func() { |
| 36 | defer wg.Done() |
| 37 | |
| 38 | for userID := range ch { |
| 39 | // Ensure the context has not been canceled (ie. shutdown has been triggered). |
| 40 | if ctx.Err() != nil { |
| 41 | break |
| 42 | } |
| 43 | |
| 44 | if err := userFunc(ctx, userID); err != nil { |
| 45 | errsMx.Lock() |
| 46 | errs.Add(err) |
| 47 | errsMx.Unlock() |
| 48 | } |
| 49 | } |
| 50 | }() |
| 51 | } |
| 52 | |
| 53 | // wait for ongoing workers to finish. |
| 54 | wg.Wait() |
| 55 | |
| 56 | if ctx.Err() != nil { |
| 57 | return ctx.Err() |
| 58 | } |
| 59 | |
| 60 | return errs.Err() |
| 61 | } |
| 62 | |
| 63 | // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. |
| 64 | // The execution breaks on first error encountered. |