MCPcopy
hub / github.com/grafana/dskit / running

Method running

kv/memberlist/memberlist_client.go:654–726  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

652var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup")
653
654func (m *KV) running(ctx context.Context) error {
655 // The key notifications goroutine must be started as the very first thing, otherwise watch key notifications
656 // will be delayed. In particular, it must be started before the memberlist cluster full-join procedure (below)
657 // because it may take a long time to complete.
658 if m.cfg.NotifyInterval > 0 {
659 // Start delayed key notifications.
660 notifTicker := time.NewTicker(m.cfg.NotifyInterval)
661 defer notifTicker.Stop()
662 go m.monitorKeyNotifications(ctx, notifTicker.C)
663 }
664
665 ok := m.joinMembersOnStartup(ctx)
666 if !ok && m.cfg.AbortIfJoinFails {
667 return errFailedToJoinCluster
668 }
669
670 // Start propagation delay tracker after joining the cluster, so that the first
671 // WatchKey callback has the full cluster state and correctly skips pre-existing beacons.
672 if m.cfg.PropagationDelayTracker.Enabled {
673 m.propagationDelayTracker = NewPropagationDelayTracker(
674 m,
675 m.cfg.PropagationDelayTracker,
676 m.memberlist.LocalNode().Name,
677 m.logger,
678 m.registerer,
679 )
680 if err := m.propagationDelayTracker.StartAsync(ctx); err != nil {
681 level.Warn(m.logger).Log("msg", "failed to start propagation delay tracker", "err", err)
682 m.propagationDelayTracker = nil
683 }
684 }
685
686 var tickerChan <-chan time.Time
687 if m.cfg.RejoinInterval > 0 && len(m.cfg.GetRejoinSeedNodes()) > 0 {
688 // Use a random initial delay between 0 and RejoinInterval to uniformly
689 // distribute rejoins across time when multiple processes start simultaneously.
690 initialDelay := 1 + time.Duration(math_rand.Int63n(int64(m.cfg.RejoinInterval)))
691 var stop func()
692 stop, tickerChan = timeutil.NewVariableTicker(initialDelay, m.cfg.RejoinInterval)
693 defer stop()
694 }
695
696 var obsoleteEntriesTickerChan <-chan time.Time
697 if m.cfg.ObsoleteEntriesTimeout > 0 {
698 obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout)
699 defer obsoleteEntriesTicker.Stop()
700
701 obsoleteEntriesTickerChan = obsoleteEntriesTicker.C
702 }
703
704 logger := log.With(m.logger, "phase", "periodic_rejoin")
705 for {
706 select {
707 case <-tickerChan:
708 const numAttempts = 1 // don't retry if resolution fails, we will try again next time
709 reached, err := m.joinMembersWithRetries(ctx, m.cfg.GetRejoinSeedNodes(), numAttempts, logger)
710 if err == nil {
711 level.Info(logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached)

Callers

nothing calls this directly

Calls 12

joinMembersOnStartupMethod · 0.95
NewVariableTickerFunction · 0.92
GetRejoinSeedNodesMethod · 0.80
WithMethod · 0.80
StopMethod · 0.65
StartAsyncMethod · 0.65
DoneMethod · 0.65
LogMethod · 0.45

Tested by

no test coverage detected