(ctx context.Context)
| 539 | } |
| 540 | |
| 541 | func (i *Lifecycler) loop(ctx context.Context) error { |
| 542 | // First, see if we exist in the cluster, update our state to match if we do, |
| 543 | // and add ourselves (without tokens) if we don't. |
| 544 | if err := i.initRing(ctx); err != nil { |
| 545 | return errors.Wrapf(err, "failed to join the ring %s", i.RingName) |
| 546 | } |
| 547 | |
| 548 | // We do various period tasks |
| 549 | autoJoinAfter := time.After(i.cfg.JoinAfter) |
| 550 | var observeChan <-chan time.Time |
| 551 | |
| 552 | heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) |
| 553 | defer heartbeatTickerStop() |
| 554 | |
| 555 | for { |
| 556 | select { |
| 557 | case <-autoJoinAfter: |
| 558 | level.Debug(i.logger).Log("msg", "JoinAfter expired", "ring", i.RingName) |
| 559 | // Will only fire once, after auto join timeout. If we haven't entered "JOINING" state, |
| 560 | // then pick some tokens and enter ACTIVE state. |
| 561 | if i.GetState() == PENDING { |
| 562 | level.Info(i.logger).Log("msg", "auto-joining cluster after timeout", "ring", i.RingName) |
| 563 | |
| 564 | if i.cfg.ObservePeriod > 0 { |
| 565 | // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING |
| 566 | // ingesters, but we also signal that it is not fully functional yet. |
| 567 | if err := i.autoJoin(ctx, JOINING); err != nil { |
| 568 | return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) |
| 569 | } |
| 570 | |
| 571 | level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) |
| 572 | observeChan = time.After(i.cfg.ObservePeriod) |
| 573 | } else { |
| 574 | if err := i.autoJoin(ctx, ACTIVE); err != nil { |
| 575 | return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) |
| 576 | } |
| 577 | } |
| 578 | } |
| 579 | |
| 580 | case <-observeChan: |
| 581 | // if observeChan is nil, this case is ignored. We keep updating observeChan while observing the ring. |
| 582 | // When observing is done, observeChan is set to nil. |
| 583 | |
| 584 | observeChan = nil |
| 585 | if s := i.GetState(); s != JOINING { |
| 586 | level.Error(i.logger).Log("msg", "unexpected state while observing tokens", "state", s, "ring", i.RingName) |
| 587 | } |
| 588 | |
| 589 | if i.verifyTokens(ctx) { |
| 590 | level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName) |
| 591 | |
| 592 | err := i.changeState(ctx, ACTIVE) |
| 593 | if err != nil { |
| 594 | level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err) |
| 595 | } |
| 596 | } else { |
| 597 | level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName) |
| 598 | // keep observing |
nothing calls this directly
no test coverage detected