Shutdown the lifecycle. It will: - send chunks to another ingester, if it can. - otherwise, flush chunks to the chunk store. - remove config from Consul.
(runningError error)
| 620 | // - otherwise, flush chunks to the chunk store. |
| 621 | // - remove config from Consul. |
| 622 | func (i *Lifecycler) stopping(runningError error) error { |
| 623 | if runningError != nil { |
| 624 | // previously lifecycler just called os.Exit (from loop method)... |
| 625 | // now it stops more gracefully, but also without doing any cleanup |
| 626 | level.Error(i.logger).Log("msg", "lifecycler loop() exited with error", "err", runningError) |
| 627 | return runningError |
| 628 | } |
| 629 | |
| 630 | heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) |
| 631 | defer heartbeatTickerStop() |
| 632 | |
| 633 | // Mark ourselved as Leaving so no more samples are send to us. |
| 634 | err := i.changeState(context.Background(), LEAVING) |
| 635 | if err != nil { |
| 636 | level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err) |
| 637 | } |
| 638 | |
| 639 | // Do the transferring / flushing on a background goroutine so we can continue |
| 640 | // to heartbeat to consul. |
| 641 | done := make(chan struct{}) |
| 642 | go func() { |
| 643 | i.processShutdown(context.Background()) |
| 644 | close(done) |
| 645 | }() |
| 646 | |
| 647 | heartbeatLoop: |
| 648 | for { |
| 649 | select { |
| 650 | case <-heartbeatTickerChan: |
| 651 | i.lifecyclerMetrics.consulHeartbeats.Inc() |
| 652 | if err := i.updateConsul(context.Background()); err != nil { |
| 653 | level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err) |
| 654 | } |
| 655 | |
| 656 | case <-done: |
| 657 | break heartbeatLoop |
| 658 | } |
| 659 | } |
| 660 | |
| 661 | if i.ShouldUnregisterOnShutdown() { |
| 662 | if err := i.unregister(context.Background()); err != nil { |
| 663 | return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) |
| 664 | } |
| 665 | level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) |
| 666 | } |
| 667 | |
| 668 | if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() { |
| 669 | if err := os.Remove(i.cfg.TokensFilePath); err != nil { |
| 670 | return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath) |
| 671 | } |
| 672 | level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath) |
| 673 | } |
| 674 | |
| 675 | return nil |
| 676 | } |
| 677 | |
| 678 | // initRing is the first thing we do when we start. It: |
| 679 | // - adds an ingester entry to the ring |
nothing calls this directly
no test coverage detected