(ctx context.Context, period time.Duration)
| 371 | } |
| 372 | |
| 373 | func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Duration) error { |
| 374 | heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(l.cfg.HeartbeatPeriod) |
| 375 | defer heartbeatTickerStop() |
| 376 | |
| 377 | // The first observation will occur after the specified period. |
| 378 | level.Info(l.logger).Log("msg", "waiting stable tokens", "ring", l.ringName) |
| 379 | observeChan := time.After(period) |
| 380 | |
| 381 | for { |
| 382 | select { |
| 383 | case <-observeChan: |
| 384 | if !l.verifyTokens(ctx) { |
| 385 | // The verification has failed |
| 386 | level.Info(l.logger).Log("msg", "tokens verification failed, keep observing", "ring", l.ringName) |
| 387 | observeChan = time.After(period) |
| 388 | break |
| 389 | } |
| 390 | |
| 391 | level.Info(l.logger).Log("msg", "tokens verification succeeded", "ring", l.ringName) |
| 392 | return nil |
| 393 | |
| 394 | case <-heartbeatTickerChan: |
| 395 | l.heartbeat(ctx) |
| 396 | |
| 397 | case <-ctx.Done(): |
| 398 | return ctx.Err() |
| 399 | } |
| 400 | } |
| 401 | } |
| 402 | |
| 403 | // Verifies that tokens that this instance has registered to the ring still belong to it. |
| 404 | // Gossiping ring may change the ownership of tokens in case of conflicts. |
no test coverage detected