MCPcopy
hub / github.com/grafana/dskit / stopping

Method stopping

kv/memberlist/memberlist_client.go:976–1019  ·  view source on GitHub ↗

While Stopping, we try to leave memberlist cluster and then shutdown memberlist client. We do this in order to send out last messages, typically that ingester has LEFT the ring.

(_ error)

Source from the content-addressed store, hash-verified

974// While Stopping, we try to leave memberlist cluster and then shutdown memberlist client.
975// We do this in order to send out last messages, typically that ingester has LEFT the ring.
976func (m *KV) stopping(_ error) error {
977 // Stop propagation delay tracker if running.
978 if m.propagationDelayTracker != nil {
979 m.propagationDelayTracker.StopAsync()
980 if err := m.propagationDelayTracker.AwaitTerminated(context.Background()); err != nil {
981 level.Warn(m.logger).Log("msg", "error stopping propagation delay tracker", "err", err)
982 }
983 }
984
985 level.Info(m.logger).Log("msg", "leaving memberlist cluster")
986
987 // Wait until queue with locally-generated messages is empty, but don't wait for too long.
988 // Also don't wait if there is just one node left.
989 // Note: Once we enter Stopping state, we don't queue more locally-generated messages.
990
991 deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown)
992
993 msgs := m.localBroadcasts.NumQueued()
994 nodes := m.memberlist.NumMembers()
995 for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) {
996 level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes)
997 time.Sleep(250 * time.Millisecond)
998
999 msgs = m.localBroadcasts.NumQueued()
1000 nodes = m.memberlist.NumMembers()
1001 }
1002
1003 if msgs > 0 {
1004 level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes)
1005 }
1006
1007 err := m.memberlist.Leave(m.cfg.LeaveTimeout)
1008 if err != nil {
1009 level.Error(m.logger).Log("msg", "error when leaving memberlist cluster", "err", err)
1010 }
1011
1012 close(m.shutdown)
1013
1014 err = m.memberlist.Shutdown()
1015 if err != nil {
1016 level.Error(m.logger).Log("msg", "error when shutting down memberlist client", "err", err)
1017 }
1018 return nil
1019}
1020
1021// List returns all known keys under a given prefix.
1022// No communication with other nodes in the cluster is done here.

Callers

nothing calls this directly

Calls 8

StopAsyncMethod · 0.65
AwaitTerminatedMethod · 0.65
AddMethod · 0.65
BeforeMethod · 0.65
SleepMethod · 0.65
LogMethod · 0.45
ErrorMethod · 0.45
ShutdownMethod · 0.45

Tested by

no test coverage detected