(ctx context.Context)
| 652 | var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup") |
| 653 | |
| 654 | func (m *KV) running(ctx context.Context) error { |
| 655 | // The key notifications goroutine must be started as the very first thing, otherwise watch key notifications |
| 656 | // will be delayed. In particular, it must be started before the memberlist cluster full-join procedure (below) |
| 657 | // because it may take a long time to complete. |
| 658 | if m.cfg.NotifyInterval > 0 { |
| 659 | // Start delayed key notifications. |
| 660 | notifTicker := time.NewTicker(m.cfg.NotifyInterval) |
| 661 | defer notifTicker.Stop() |
| 662 | go m.monitorKeyNotifications(ctx, notifTicker.C) |
| 663 | } |
| 664 | |
| 665 | ok := m.joinMembersOnStartup(ctx) |
| 666 | if !ok && m.cfg.AbortIfJoinFails { |
| 667 | return errFailedToJoinCluster |
| 668 | } |
| 669 | |
| 670 | // Start propagation delay tracker after joining the cluster, so that the first |
| 671 | // WatchKey callback has the full cluster state and correctly skips pre-existing beacons. |
| 672 | if m.cfg.PropagationDelayTracker.Enabled { |
| 673 | m.propagationDelayTracker = NewPropagationDelayTracker( |
| 674 | m, |
| 675 | m.cfg.PropagationDelayTracker, |
| 676 | m.memberlist.LocalNode().Name, |
| 677 | m.logger, |
| 678 | m.registerer, |
| 679 | ) |
| 680 | if err := m.propagationDelayTracker.StartAsync(ctx); err != nil { |
| 681 | level.Warn(m.logger).Log("msg", "failed to start propagation delay tracker", "err", err) |
| 682 | m.propagationDelayTracker = nil |
| 683 | } |
| 684 | } |
| 685 | |
| 686 | var tickerChan <-chan time.Time |
| 687 | if m.cfg.RejoinInterval > 0 && len(m.cfg.GetRejoinSeedNodes()) > 0 { |
| 688 | // Use a random initial delay between 0 and RejoinInterval to uniformly |
| 689 | // distribute rejoins across time when multiple processes start simultaneously. |
| 690 | initialDelay := 1 + time.Duration(math_rand.Int63n(int64(m.cfg.RejoinInterval))) |
| 691 | var stop func() |
| 692 | stop, tickerChan = timeutil.NewVariableTicker(initialDelay, m.cfg.RejoinInterval) |
| 693 | defer stop() |
| 694 | } |
| 695 | |
| 696 | var obsoleteEntriesTickerChan <-chan time.Time |
| 697 | if m.cfg.ObsoleteEntriesTimeout > 0 { |
| 698 | obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout) |
| 699 | defer obsoleteEntriesTicker.Stop() |
| 700 | |
| 701 | obsoleteEntriesTickerChan = obsoleteEntriesTicker.C |
| 702 | } |
| 703 | |
| 704 | logger := log.With(m.logger, "phase", "periodic_rejoin") |
| 705 | for { |
| 706 | select { |
| 707 | case <-tickerChan: |
| 708 | const numAttempts = 1 // don't retry if resolution fails, we will try again next time |
| 709 | reached, err := m.joinMembersWithRetries(ctx, m.cfg.GetRejoinSeedNodes(), numAttempts, logger) |
| 710 | if err == nil { |
| 711 | level.Info(logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached) |
nothing calls this directly
no test coverage detected