MCPcopy
hub / github.com/grafana/dskit / record

Method record

ring/batch.go:203–259  ·  view source on GitHub ↗
(itemTrackers []*itemTracker, err error, isClientError func(error) bool)

Source from the content-addressed store, hash-verified

201}
202
203func (b *batchTracker) record(itemTrackers []*itemTracker, err error, isClientError func(error) bool) {
204 // If we reach the required number of successful puts on this item, then decrement the
205 // number of pending items by one.
206 //
207 // The use of atomic increments here is needed as:
208 // * rpcsPending and rpcsFailed guarantee only a single goroutine will write to either channel
209 // * succeeded, failedClient, failedServer and remaining guarantee that the "return decision" is made atomically
210 // avoiding race condition
211 for _, it := range itemTrackers {
212 if err != nil {
213 // Track the number of errors by error family, and if it exceeds maxFailures
214 // shortcut the waiting rpc.
215 errCount := it.recordError(err, isClientError)
216 // We should return an error if we reach the maxFailure (quorum) on a given error family OR
217 // we don't have any remaining instances to try. In the following we use ClientError and ServerError
218 // to denote errors, for which isClientError() returns true and false respectively.
219 //
220 // Ex: Success, ClientError, ServerError -> return ServerError
221 // Ex: ClientError, ClientError, Success -> return ClientError
222 // Ex: ServerError, Success, ServerError -> return ServerError
223 //
224 // The reason for searching for quorum in ClientError and ServerError errors separately is to give a more accurate
225 // response to the initial request. So if a quorum of instances rejects the request with ClientError, then the request should be rejected
226 // even if less-than-quorum instances indicated a failure to process the request (via ServerError).
227 // The speculation is that had the unavailable instances been available,
228 // they would have rejected the request with a ClientError as well.
229 // Conversely, if a quorum of instances failed to process the request via ServerError and less-than-quorum
230 // instances rejected it with ClientError, then we do not have quorum to reject the request as a ClientError. Instead,
231 // we return the last ServerError error for debuggability.
232 if errCount > int32(it.maxFailures) || it.remaining.Dec() == 0 {
233 if b.rpcsFailed.Inc() == 1 {
234 b.err <- err
235 }
236 }
237 } else {
238 // If we successfully process items in minSuccess instances,
239 // then wake up the waiting rpc, so it can return early.
240 succeeded := it.succeeded.Inc()
241 if succeeded == int32(it.minSuccess) {
242 if b.rpcsPending.Dec() == 0 {
243 b.done <- struct{}{}
244 }
245 continue
246 }
247
248 // If we successfully called this particular instance, but we don't have any remaining instances to try,
249 // and we failed to call minSuccess instances, then we need to return the last error.
250 if succeeded < int32(it.minSuccess) {
251 if it.remaining.Dec() == 0 {
252 if b.rpcsFailed.Inc() == 1 {
253 b.err <- it.err.Load()
254 }
255 }
256 }
257 }
258 }
259}

Callers 2

DoBatchWithOptionsFunction · 0.95
executeScenarioFunction · 0.80

Calls 1

recordErrorMethod · 0.80

Tested by 1

executeScenarioFunction · 0.64