MCPcopy
hub / github.com/grafana/dskit / DoUntilQuorumWithoutSuccessfulContextCancellation

Function DoUntilQuorumWithoutSuccessfulContextCancellation

ring/replication_set.go:244–401  ·  view source on GitHub ↗

DoUntilQuorumWithoutSuccessfulContextCancellation behaves the same as DoUntilQuorum, except it does not cancel the context.Context passed to invocations of f whose results are returned. For example, this is useful in situations where DoUntilQuorumWithoutSuccessfulContextCancellation is used to esta

(ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T))

Source from the content-addressed store, hash-verified

242//
243// Failing to do this may result in a memory leak.
244func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T)) ([]T, error) {
245 if err := cfg.Validate(); err != nil {
246 return nil, err
247 }
248
249 if r.ZoneAwarenessEnabled && r.MaxErrors > 0 {
250 return nil, fmt.Errorf("invalid ReplicationSet: MaxErrors is non-zero (is %v) and ZoneAwarenessEnabled is true", r.MaxErrors)
251 }
252
253 var logger kitlog.Logger = cfg.Logger
254 if cfg.Logger == nil {
255 logger = kitlog.NewNopLogger()
256 }
257
258 if cfg.IncludeReplicaCount {
259 ctx = ContextWithAvailableReplicas(ctx, len(r.Instances))
260 }
261
262 resultsChan := make(chan instanceResult[T], len(r.Instances))
263 resultsRemaining := len(r.Instances)
264
265 defer func() {
266 go func() {
267 for resultsRemaining > 0 {
268 result := <-resultsChan
269 resultsRemaining--
270
271 if result.err == nil {
272 cleanupFunc(result.result)
273 }
274 }
275 }()
276 }()
277
278 var resultTracker replicationSetResultTracker
279 var contextTracker replicationSetContextTracker
280 if r.MaxUnavailableZones > 0 || r.ZoneAwarenessEnabled {
281 resultTracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, cfg.ZoneSorter, logger)
282 contextTracker = newZoneAwareContextTracker(ctx, r.Instances)
283 } else {
284 resultTracker = newDefaultResultTracker(r.Instances, r.MaxErrors, logger)
285 contextTracker = newDefaultContextTracker(ctx, r.Instances)
286 }
287
288 if cfg.MinimizeRequests {
289 resultTracker.startMinimumRequests()
290 } else {
291 resultTracker.startAllRequests()
292 }
293
294 for i := range r.Instances {
295 instance := &r.Instances[i]
296 ctx, ctxCancel := contextTracker.contextFor(instance)
297
298 go func(desc *InstanceDesc) {
299 if err := resultTracker.awaitStart(ctx, desc); err != nil {
300 // Post to resultsChan so that the deferred cleanup handler above eventually terminates.
301 resultsChan <- instanceResult[T]{

Calls 15

startMinimumRequestsMethod · 0.95
startAllRequestsMethod · 0.95
contextForMethod · 0.95
awaitStartMethod · 0.95
cancelAllContextsMethod · 0.95
succeededMethod · 0.95
doneMethod · 0.95
cancelContextForMethod · 0.95
failedMethod · 0.95
NewErrorFunction · 0.92