While Stopping, we try to leave memberlist cluster and then shutdown memberlist client. We do this in order to send out last messages, typically that ingester has LEFT the ring.
(_ error)
| 974 | // While Stopping, we try to leave memberlist cluster and then shutdown memberlist client. |
| 975 | // We do this in order to send out last messages, typically that ingester has LEFT the ring. |
| 976 | func (m *KV) stopping(_ error) error { |
| 977 | // Stop propagation delay tracker if running. |
| 978 | if m.propagationDelayTracker != nil { |
| 979 | m.propagationDelayTracker.StopAsync() |
| 980 | if err := m.propagationDelayTracker.AwaitTerminated(context.Background()); err != nil { |
| 981 | level.Warn(m.logger).Log("msg", "error stopping propagation delay tracker", "err", err) |
| 982 | } |
| 983 | } |
| 984 | |
| 985 | level.Info(m.logger).Log("msg", "leaving memberlist cluster") |
| 986 | |
| 987 | // Wait until queue with locally-generated messages is empty, but don't wait for too long. |
| 988 | // Also don't wait if there is just one node left. |
| 989 | // Note: Once we enter Stopping state, we don't queue more locally-generated messages. |
| 990 | |
| 991 | deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown) |
| 992 | |
| 993 | msgs := m.localBroadcasts.NumQueued() |
| 994 | nodes := m.memberlist.NumMembers() |
| 995 | for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { |
| 996 | level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes) |
| 997 | time.Sleep(250 * time.Millisecond) |
| 998 | |
| 999 | msgs = m.localBroadcasts.NumQueued() |
| 1000 | nodes = m.memberlist.NumMembers() |
| 1001 | } |
| 1002 | |
| 1003 | if msgs > 0 { |
| 1004 | level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes) |
| 1005 | } |
| 1006 | |
| 1007 | err := m.memberlist.Leave(m.cfg.LeaveTimeout) |
| 1008 | if err != nil { |
| 1009 | level.Error(m.logger).Log("msg", "error when leaving memberlist cluster", "err", err) |
| 1010 | } |
| 1011 | |
| 1012 | close(m.shutdown) |
| 1013 | |
| 1014 | err = m.memberlist.Shutdown() |
| 1015 | if err != nil { |
| 1016 | level.Error(m.logger).Log("msg", "error when shutting down memberlist client", "err", err) |
| 1017 | } |
| 1018 | return nil |
| 1019 | } |
| 1020 | |
| 1021 | // List returns all known keys under a given prefix. |
| 1022 | // No communication with other nodes in the cluster is done here. |