(ctx context.Context)
| 117 | } |
| 118 | |
| 119 | func (t *PropagationDelayTracker) running(ctx context.Context) error { |
| 120 | t.startupTime = time.Now().UnixMilli() |
| 121 | level.Info(t.logger).Log("msg", "propagation delay tracker started", "beacon_interval", t.cfg.BeaconInterval, "beacon_lifetime", t.cfg.BeaconLifetime) |
| 122 | |
| 123 | // Start the goroutine to track beacon arrivals in real-time |
| 124 | watchCtx, cancelWatch := context.WithCancel(ctx) |
| 125 | defer cancelWatch() |
| 126 | |
| 127 | watchDone := make(chan struct{}) |
| 128 | go func() { |
| 129 | defer close(watchDone) |
| 130 | t.watchBeacons(watchCtx) |
| 131 | }() |
| 132 | defer func() { |
| 133 | select { |
| 134 | case <-watchDone: |
| 135 | case <-time.After(10 * time.Second): |
| 136 | level.Warn(t.logger).Log("msg", "timed out waiting for watch goroutine to finish") |
| 137 | } |
| 138 | }() |
| 139 | |
| 140 | // Start the beacon publish ticker. The first tick uses a random delay between 1ns and |
| 141 | // beaconInterval to distribute beacon publishing over time when multiple processes |
| 142 | // start simultaneously (e.g., during a rollout). |
| 143 | initialDelay := 1 + time.Duration(rand.Int63n(int64(t.cfg.BeaconInterval))) |
| 144 | stopTicker, tickerChan := timeutil.NewVariableTicker(initialDelay, t.cfg.BeaconInterval) |
| 145 | defer stopTicker() |
| 146 | |
| 147 | for { |
| 148 | select { |
| 149 | case <-ctx.Done(): |
| 150 | return nil |
| 151 | case <-tickerChan: |
| 152 | t.onBeaconInterval(ctx) |
| 153 | } |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | // watchBeacons uses WatchKey to receive beacon updates in real-time and measure delay. |
| 158 | // It loops with backoff until the context is canceled. |
nothing calls this directly
no test coverage detected