MCPcopy
hub / github.com/grafana/tempo / forPartitionRingReplicaSets

Function forPartitionRingReplicaSets

modules/querier/partition_ring.go:13–33  ·  view source on GitHub ↗

forPartitionRingReplicaSets runs f, in parallel, for all live-store instances in the input replicationSets. Return an error if any f fails for any of the input replicationSets.

(ctx context.Context, q *Querier, replicationSets []ring.ReplicationSet, f func(context.Context, TClient) (R, error))

Source from the content-addressed store, hash-verified

11// forPartitionRingReplicaSets runs f, in parallel, for all live-store instances in the input replicationSets.
12// Return an error if any f fails for any of the input replicationSets.
13func forPartitionRingReplicaSets[R any, TClient any](ctx context.Context, q *Querier, replicationSets []ring.ReplicationSet, f func(context.Context, TClient) (R, error)) ([]R, error) {
14 wrappedF := func(ctx context.Context, instance *ring.InstanceDesc) (R, error) {
15 client, err := q.liveStorePool.GetClientForInstance(*instance)
16 if err != nil {
17 var empty R
18 return empty, err
19 }
20
21 return f(ctx, client.(TClient))
22 }
23
24 cleanup := func(_ R) {
25 // Nothing to do.
26 }
27
28 quorumConfig := q.queryQuorumConfigForReplicationSets(ctx, replicationSets)
29
30 return concurrency.ForEachJobMergeResults(ctx, replicationSets, 0, func(ctx context.Context, set ring.ReplicationSet) ([]R, error) {
31 return ring.DoUntilQuorum(ctx, set, quorumConfig, wrappedF, cleanup)
32 })
33}
34
35// queryQuorumConfigForReplicationSets returns the config to use with "do until quorum" functions when running queries.
36func (q *Querier) queryQuorumConfigForReplicationSets(_ context.Context, _ []ring.ReplicationSet) ring.DoUntilQuorumConfig {

Callers 2

forLiveStoreRingMethod · 0.85

Calls 2

fFunction · 0.85

Tested by

no test coverage detected