MCPcopy
hub / github.com/grafana/dskit / DoBatchWithOptions

Function DoBatchWithOptions

ring/batch.go:114–201  ·  view source on GitHub ↗

DoBatchWithOptions request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different instances, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items w

(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions)

Source from the content-addressed store, hash-verified

112//
113// Not implemented as a method on Ring, so we can test separately.
114func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions) error {
115 o.replaceZeroValuesWithDefaults()
116
117 if r.InstancesCount() <= 0 {
118 o.Cleanup()
119 return fmt.Errorf("DoBatch: InstancesCount <= 0")
120 }
121 expectedTrackersPerInstance := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount()
122 itemTrackers := make([]itemTracker, len(keys))
123 instances := make(map[string]instance, r.InstancesCount())
124
125 var (
126 bufDescs [GetBufferSize]InstanceDesc
127 bufHosts [GetBufferSize]string
128 bufZones [GetBufferSize]string
129 )
130 for i, key := range keys {
131 // Get call below takes ~1 microsecond for ~500 instances.
132 // Checking every 10K calls would be every 10ms.
133 if i%10e3 == 0 {
134 if err := context.Cause(ctx); err != nil {
135 o.Cleanup()
136 return err
137 }
138 }
139
140 replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
141 if err != nil {
142 o.Cleanup()
143 return err
144 }
145 itemTrackers[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
146 itemTrackers[i].maxFailures = replicationSet.MaxErrors
147 itemTrackers[i].remaining.Store(int32(len(replicationSet.Instances)))
148
149 for _, desc := range replicationSet.Instances {
150 curr, found := instances[desc.Addr]
151 if !found {
152 curr.itemTrackers = make([]*itemTracker, 0, expectedTrackersPerInstance)
153 curr.indexes = make([]int, 0, expectedTrackersPerInstance)
154 }
155 instances[desc.Addr] = instance{
156 desc: desc,
157 itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),
158 indexes: append(curr.indexes, i),
159 }
160 }
161 }
162
163 // One last check before calling the callbacks: it doesn't make sense if context is canceled.
164 if err := context.Cause(ctx); err != nil {
165 o.Cleanup()
166 return err
167 }
168
169 tracker := batchTracker{
170 done: make(chan struct{}, 1),
171 err: make(chan error, 1),

Calls 10

recordMethod · 0.95
ErrorfMethod · 0.80
GoMethod · 0.80
InstancesCountMethod · 0.65
ReplicationFactorMethod · 0.65
GetMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
WaitMethod · 0.45