MCPcopy
hub / github.com/redis/go-redis / ForEachShard

Method ForEachShard

osscluster.go:1419–1459  ·  view source on GitHub ↗

ForEachShard concurrently calls the fn on each known node in the cluster. It returns the first error if any.

(
	ctx context.Context,
	fn func(ctx context.Context, client *Client) error,
)

Source from the content-addressed store, hash-verified

1417// ForEachShard concurrently calls the fn on each known node in the cluster.
1418// It returns the first error if any.
1419func (c *ClusterClient) ForEachShard(
1420 ctx context.Context,
1421 fn func(ctx context.Context, client *Client) error,
1422) error {
1423 state, err := c.state.ReloadOrGet(ctx)
1424 if err != nil {
1425 return err
1426 }
1427
1428 var wg sync.WaitGroup
1429 errCh := make(chan error, 1)
1430
1431 worker := func(node *clusterNode) {
1432 defer wg.Done()
1433 err := fn(ctx, node.Client)
1434 if err != nil {
1435 select {
1436 case errCh <- err:
1437 default:
1438 }
1439 }
1440 }
1441
1442 for _, node := range state.Masters {
1443 wg.Add(1)
1444 go worker(node)
1445 }
1446 for _, node := range state.Slaves {
1447 wg.Add(1)
1448 go worker(node)
1449 }
1450
1451 wg.Wait()
1452
1453 select {
1454 case err := <-errCh:
1455 return err
1456 default:
1457 return nil
1458 }
1459}
1460
1461// PoolStats returns accumulated connection pool stats.
1462func (c *ClusterClient) PoolStats() *PoolStats {

Callers 4

ScriptLoadMethod · 0.95
ScriptFlushMethod · 0.95
ScriptExistsMethod · 0.95

Calls 3

ReloadOrGetMethod · 0.80
WaitMethod · 0.80
AddMethod · 0.65

Tested by 1