ForEachSlave concurrently calls the fn on each slave node in the cluster. It returns the first error if any.
( ctx context.Context, fn func(ctx context.Context, client *Client) error, )
| 1379 | // ForEachSlave concurrently calls the fn on each slave node in the cluster. |
| 1380 | // It returns the first error if any. |
| 1381 | func (c *ClusterClient) ForEachSlave( |
| 1382 | ctx context.Context, |
| 1383 | fn func(ctx context.Context, client *Client) error, |
| 1384 | ) error { |
| 1385 | state, err := c.state.ReloadOrGet(ctx) |
| 1386 | if err != nil { |
| 1387 | return err |
| 1388 | } |
| 1389 | |
| 1390 | var wg sync.WaitGroup |
| 1391 | errCh := make(chan error, 1) |
| 1392 | |
| 1393 | for _, slave := range state.Slaves { |
| 1394 | wg.Add(1) |
| 1395 | go func(node *clusterNode) { |
| 1396 | defer wg.Done() |
| 1397 | err := fn(ctx, node.Client) |
| 1398 | if err != nil { |
| 1399 | select { |
| 1400 | case errCh <- err: |
| 1401 | default: |
| 1402 | } |
| 1403 | } |
| 1404 | }(slave) |
| 1405 | } |
| 1406 | |
| 1407 | wg.Wait() |
| 1408 | |
| 1409 | select { |
| 1410 | case err := <-errCh: |
| 1411 | return err |
| 1412 | default: |
| 1413 | return nil |
| 1414 | } |
| 1415 | } |
| 1416 | |
| 1417 | // ForEachShard concurrently calls the fn on each known node in the cluster. |
| 1418 | // It returns the first error if any. |
no test coverage detected