forPartitionRingReplicaSets runs f, in parallel, for all live-store instances in the input replicationSets. Return an error if any f fails for any of the input replicationSets.
(ctx context.Context, q *Querier, replicationSets []ring.ReplicationSet, f func(context.Context, TClient) (R, error))
| 11 | // forPartitionRingReplicaSets runs f, in parallel, for all live-store instances in the input replicationSets. |
| 12 | // Return an error if any f fails for any of the input replicationSets. |
| 13 | func forPartitionRingReplicaSets[R any, TClient any](ctx context.Context, q *Querier, replicationSets []ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) { |
| 14 | wrappedF := func(ctx context.Context, instance *ring.InstanceDesc) (R, error) { |
| 15 | client, err := q.liveStorePool.GetClientForInstance(*instance) |
| 16 | if err != nil { |
| 17 | var empty R |
| 18 | return empty, err |
| 19 | } |
| 20 | |
| 21 | return f(ctx, client.(TClient)) |
| 22 | } |
| 23 | |
| 24 | cleanup := func(_ R) { |
| 25 | // Nothing to do. |
| 26 | } |
| 27 | |
| 28 | quorumConfig := q.queryQuorumConfigForReplicationSets(ctx, replicationSets) |
| 29 | |
| 30 | return concurrency.ForEachJobMergeResults(ctx, replicationSets, 0, func(ctx context.Context, set ring.ReplicationSet) ([]R, error) { |
| 31 | return ring.DoUntilQuorum(ctx, set, quorumConfig, wrappedF, cleanup) |
| 32 | }) |
| 33 | } |
| 34 | |
| 35 | // queryQuorumConfigForReplicationSets returns the config to use with "do until quorum" functions when running queries. |
| 36 | func (q *Querier) queryQuorumConfigForReplicationSets(_ context.Context, _ []ring.ReplicationSet) ring.DoUntilQuorumConfig { |
no test coverage detected