MCPcopy
hub / github.com/grafana/dskit / stopping

Method stopping

ring/lifecycler.go:622–676  ·  view source on GitHub ↗

Shutdown the lifecycle. It will: - send chunks to another ingester, if it can. - otherwise, flush chunks to the chunk store. - remove config from Consul.

(runningError error)

Source from the content-addressed store, hash-verified

620// - otherwise, flush chunks to the chunk store.
621// - remove config from Consul.
622func (i *Lifecycler) stopping(runningError error) error {
623 if runningError != nil {
624 // previously lifecycler just called os.Exit (from loop method)...
625 // now it stops more gracefully, but also without doing any cleanup
626 level.Error(i.logger).Log("msg", "lifecycler loop() exited with error", "err", runningError)
627 return runningError
628 }
629
630 heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
631 defer heartbeatTickerStop()
632
633 // Mark ourselved as Leaving so no more samples are send to us.
634 err := i.changeState(context.Background(), LEAVING)
635 if err != nil {
636 level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
637 }
638
639 // Do the transferring / flushing on a background goroutine so we can continue
640 // to heartbeat to consul.
641 done := make(chan struct{})
642 go func() {
643 i.processShutdown(context.Background())
644 close(done)
645 }()
646
647heartbeatLoop:
648 for {
649 select {
650 case <-heartbeatTickerChan:
651 i.lifecyclerMetrics.consulHeartbeats.Inc()
652 if err := i.updateConsul(context.Background()); err != nil {
653 level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
654 }
655
656 case <-done:
657 break heartbeatLoop
658 }
659 }
660
661 if i.ShouldUnregisterOnShutdown() {
662 if err := i.unregister(context.Background()); err != nil {
663 return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
664 }
665 level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName)
666 }
667
668 if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() {
669 if err := os.Remove(i.cfg.TokensFilePath); err != nil {
670 return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath)
671 }
672 level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath)
673 }
674
675 return nil
676}
677
678// initRing is the first thing we do when we start. It:
679// - adds an ingester entry to the ring

Callers

nothing calls this directly

Calls 9

changeStateMethod · 0.95
processShutdownMethod · 0.95
updateConsulMethod · 0.95
unregisterMethod · 0.95
ClearTokensOnShutdownMethod · 0.95
newDisableableTickerFunction · 0.85
LogMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected