MCPcopy
hub / github.com/grafana/dskit / loop

Method loop

ring/lifecycler.go:541–616  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

539}
540
541func (i *Lifecycler) loop(ctx context.Context) error {
542 // First, see if we exist in the cluster, update our state to match if we do,
543 // and add ourselves (without tokens) if we don't.
544 if err := i.initRing(ctx); err != nil {
545 return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
546 }
547
548 // We do various period tasks
549 autoJoinAfter := time.After(i.cfg.JoinAfter)
550 var observeChan <-chan time.Time
551
552 heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
553 defer heartbeatTickerStop()
554
555 for {
556 select {
557 case <-autoJoinAfter:
558 level.Debug(i.logger).Log("msg", "JoinAfter expired", "ring", i.RingName)
559 // Will only fire once, after auto join timeout. If we haven't entered "JOINING" state,
560 // then pick some tokens and enter ACTIVE state.
561 if i.GetState() == PENDING {
562 level.Info(i.logger).Log("msg", "auto-joining cluster after timeout", "ring", i.RingName)
563
564 if i.cfg.ObservePeriod > 0 {
565 // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
566 // ingesters, but we also signal that it is not fully functional yet.
567 if err := i.autoJoin(ctx, JOINING); err != nil {
568 return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
569 }
570
571 level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
572 observeChan = time.After(i.cfg.ObservePeriod)
573 } else {
574 if err := i.autoJoin(ctx, ACTIVE); err != nil {
575 return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
576 }
577 }
578 }
579
580 case <-observeChan:
581 // if observeChan is nil, this case is ignored. We keep updating observeChan while observing the ring.
582 // When observing is done, observeChan is set to nil.
583
584 observeChan = nil
585 if s := i.GetState(); s != JOINING {
586 level.Error(i.logger).Log("msg", "unexpected state while observing tokens", "state", s, "ring", i.RingName)
587 }
588
589 if i.verifyTokens(ctx) {
590 level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName)
591
592 err := i.changeState(ctx, ACTIVE)
593 if err != nil {
594 level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err)
595 }
596 } else {
597 level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
598 // keep observing

Callers

nothing calls this directly

Calls 11

initRingMethod · 0.95
GetStateMethod · 0.95
autoJoinMethod · 0.95
verifyTokensMethod · 0.95
changeStateMethod · 0.95
updateConsulMethod · 0.95
newDisableableTickerFunction · 0.85
AfterMethod · 0.65
DoneMethod · 0.65
LogMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected