MCPcopy
hub / github.com/grafana/dskit / ForEachUser

Function ForEachUser

concurrency/runner.go:16–61  ·  view source on GitHub ↗

ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. In case userFunc returns error, it will continue to process remaining users but returns an error with all errors userFunc has returned.

(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error)

Source from the content-addressed store, hash-verified

14// In case userFunc returns error, it will continue to process remaining users but returns an
15// error with all errors userFunc has returned.
16func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error {
17 if len(userIDs) == 0 {
18 return nil
19 }
20
21 // Push all jobs to a channel.
22 ch := make(chan string, len(userIDs))
23 for _, userID := range userIDs {
24 ch <- userID
25 }
26 close(ch)
27
28 // Keep track of all errors occurred.
29 errs := multierror.MultiError{}
30 errsMx := sync.Mutex{}
31
32 wg := sync.WaitGroup{}
33 for ix := 0; ix < min(concurrency, len(userIDs)); ix++ {
34 wg.Add(1)
35 go func() {
36 defer wg.Done()
37
38 for userID := range ch {
39 // Ensure the context has not been canceled (ie. shutdown has been triggered).
40 if ctx.Err() != nil {
41 break
42 }
43
44 if err := userFunc(ctx, userID); err != nil {
45 errsMx.Lock()
46 errs.Add(err)
47 errsMx.Unlock()
48 }
49 }
50 }()
51 }
52
53 // wait for ongoing workers to finish.
54 wg.Wait()
55
56 if ctx.Err() != nil {
57 return ctx.Err()
58 }
59
60 return errs.Err()
61}
62
63// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers.
64// The execution breaks on first error encountered.

Calls 6

AddMethod · 0.95
ErrMethod · 0.95
AddMethod · 0.65
DoneMethod · 0.65
ErrMethod · 0.45
WaitMethod · 0.45