DoUntilQuorumWithoutSuccessfulContextCancellation behaves the same as DoUntilQuorum, except it does not cancel the context.Context passed to invocations of f whose results are returned. For example, this is useful in situations where DoUntilQuorumWithoutSuccessfulContextCancellation is used to esta
(ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T))
| 242 | // |
| 243 | // Failing to do this may result in a memory leak. |
| 244 | func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelCauseFunc) (T, error), cleanupFunc func(T)) ([]T, error) { |
| 245 | if err := cfg.Validate(); err != nil { |
| 246 | return nil, err |
| 247 | } |
| 248 | |
| 249 | if r.ZoneAwarenessEnabled && r.MaxErrors > 0 { |
| 250 | return nil, fmt.Errorf("invalid ReplicationSet: MaxErrors is non-zero (is %v) and ZoneAwarenessEnabled is true", r.MaxErrors) |
| 251 | } |
| 252 | |
| 253 | var logger kitlog.Logger = cfg.Logger |
| 254 | if cfg.Logger == nil { |
| 255 | logger = kitlog.NewNopLogger() |
| 256 | } |
| 257 | |
| 258 | if cfg.IncludeReplicaCount { |
| 259 | ctx = ContextWithAvailableReplicas(ctx, len(r.Instances)) |
| 260 | } |
| 261 | |
| 262 | resultsChan := make(chan instanceResult[T], len(r.Instances)) |
| 263 | resultsRemaining := len(r.Instances) |
| 264 | |
| 265 | defer func() { |
| 266 | go func() { |
| 267 | for resultsRemaining > 0 { |
| 268 | result := <-resultsChan |
| 269 | resultsRemaining-- |
| 270 | |
| 271 | if result.err == nil { |
| 272 | cleanupFunc(result.result) |
| 273 | } |
| 274 | } |
| 275 | }() |
| 276 | }() |
| 277 | |
| 278 | var resultTracker replicationSetResultTracker |
| 279 | var contextTracker replicationSetContextTracker |
| 280 | if r.MaxUnavailableZones > 0 || r.ZoneAwarenessEnabled { |
| 281 | resultTracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, cfg.ZoneSorter, logger) |
| 282 | contextTracker = newZoneAwareContextTracker(ctx, r.Instances) |
| 283 | } else { |
| 284 | resultTracker = newDefaultResultTracker(r.Instances, r.MaxErrors, logger) |
| 285 | contextTracker = newDefaultContextTracker(ctx, r.Instances) |
| 286 | } |
| 287 | |
| 288 | if cfg.MinimizeRequests { |
| 289 | resultTracker.startMinimumRequests() |
| 290 | } else { |
| 291 | resultTracker.startAllRequests() |
| 292 | } |
| 293 | |
| 294 | for i := range r.Instances { |
| 295 | instance := &r.Instances[i] |
| 296 | ctx, ctxCancel := contextTracker.contextFor(instance) |
| 297 | |
| 298 | go func(desc *InstanceDesc) { |
| 299 | if err := resultTracker.awaitStart(ctx, desc); err != nil { |
| 300 | // Post to resultsChan so that the deferred cleanup handler above eventually terminates. |
| 301 | resultsChan <- instanceResult[T]{ |