ForEachShard concurrently calls the fn on each live shard in the ring. It returns the first error if any.
( ctx context.Context, fn func(ctx context.Context, client *Client) error, )
| 721 | // ForEachShard concurrently calls the fn on each live shard in the ring. |
| 722 | // It returns the first error if any. |
| 723 | func (c *Ring) ForEachShard( |
| 724 | ctx context.Context, |
| 725 | fn func(ctx context.Context, client *Client) error, |
| 726 | ) error { |
| 727 | // note: `c.List()` return a shadow copy of `[]*ringShard`. |
| 728 | shards := c.sharding.List() |
| 729 | var wg sync.WaitGroup |
| 730 | errCh := make(chan error, 1) |
| 731 | for _, shard := range shards { |
| 732 | if shard.IsDown() { |
| 733 | continue |
| 734 | } |
| 735 | |
| 736 | wg.Add(1) |
| 737 | go func(shard *ringShard) { |
| 738 | defer wg.Done() |
| 739 | err := fn(ctx, shard.Client) |
| 740 | if err != nil { |
| 741 | select { |
| 742 | case errCh <- err: |
| 743 | default: |
| 744 | } |
| 745 | } |
| 746 | }(shard) |
| 747 | } |
| 748 | wg.Wait() |
| 749 | |
| 750 | select { |
| 751 | case err := <-errCh: |
| 752 | return err |
| 753 | default: |
| 754 | return nil |
| 755 | } |
| 756 | } |
| 757 | |
| 758 | func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) { |
| 759 | // note: `c.List()` return a shadow copy of `[]*ringShard`. |