ForEachShard concurrently calls the fn on each known node in the cluster. It returns the first error if any.
( ctx context.Context, fn func(ctx context.Context, client *Client) error, )
| 1417 | // ForEachShard concurrently calls the fn on each known node in the cluster. |
| 1418 | // It returns the first error if any. |
| 1419 | func (c *ClusterClient) ForEachShard( |
| 1420 | ctx context.Context, |
| 1421 | fn func(ctx context.Context, client *Client) error, |
| 1422 | ) error { |
| 1423 | state, err := c.state.ReloadOrGet(ctx) |
| 1424 | if err != nil { |
| 1425 | return err |
| 1426 | } |
| 1427 | |
| 1428 | var wg sync.WaitGroup |
| 1429 | errCh := make(chan error, 1) |
| 1430 | |
| 1431 | worker := func(node *clusterNode) { |
| 1432 | defer wg.Done() |
| 1433 | err := fn(ctx, node.Client) |
| 1434 | if err != nil { |
| 1435 | select { |
| 1436 | case errCh <- err: |
| 1437 | default: |
| 1438 | } |
| 1439 | } |
| 1440 | } |
| 1441 | |
| 1442 | for _, node := range state.Masters { |
| 1443 | wg.Add(1) |
| 1444 | go worker(node) |
| 1445 | } |
| 1446 | for _, node := range state.Slaves { |
| 1447 | wg.Add(1) |
| 1448 | go worker(node) |
| 1449 | } |
| 1450 | |
| 1451 | wg.Wait() |
| 1452 | |
| 1453 | select { |
| 1454 | case err := <-errCh: |
| 1455 | return err |
| 1456 | default: |
| 1457 | return nil |
| 1458 | } |
| 1459 | } |
| 1460 | |
| 1461 | // PoolStats returns accumulated connection pool stats. |
| 1462 | func (c *ClusterClient) PoolStats() *PoolStats { |