MCPcopy
hub / github.com/grafana/dskit / running

Method running

kv/memberlist/propagation_tracker.go:119–155  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

117}
118
119func (t *PropagationDelayTracker) running(ctx context.Context) error {
120 t.startupTime = time.Now().UnixMilli()
121 level.Info(t.logger).Log("msg", "propagation delay tracker started", "beacon_interval", t.cfg.BeaconInterval, "beacon_lifetime", t.cfg.BeaconLifetime)
122
123 // Start the goroutine to track beacon arrivals in real-time
124 watchCtx, cancelWatch := context.WithCancel(ctx)
125 defer cancelWatch()
126
127 watchDone := make(chan struct{})
128 go func() {
129 defer close(watchDone)
130 t.watchBeacons(watchCtx)
131 }()
132 defer func() {
133 select {
134 case <-watchDone:
135 case <-time.After(10 * time.Second):
136 level.Warn(t.logger).Log("msg", "timed out waiting for watch goroutine to finish")
137 }
138 }()
139
140 // Start the beacon publish ticker. The first tick uses a random delay between 1ns and
141 // beaconInterval to distribute beacon publishing over time when multiple processes
142 // start simultaneously (e.g., during a rollout).
143 initialDelay := 1 + time.Duration(rand.Int63n(int64(t.cfg.BeaconInterval)))
144 stopTicker, tickerChan := timeutil.NewVariableTicker(initialDelay, t.cfg.BeaconInterval)
145 defer stopTicker()
146
147 for {
148 select {
149 case <-ctx.Done():
150 return nil
151 case <-tickerChan:
152 t.onBeaconInterval(ctx)
153 }
154 }
155}
156
157// watchBeacons uses WatchKey to receive beacon updates in real-time and measure delay.
158// It loops with backoff until the context is canceled.

Callers

nothing calls this directly

Calls 6

watchBeaconsMethod · 0.95
onBeaconIntervalMethod · 0.95
NewVariableTickerFunction · 0.92
AfterMethod · 0.65
DoneMethod · 0.65
LogMethod · 0.45

Tested by

no test coverage detected