watchBeacons uses WatchKey to receive beacon updates in real-time and measure delay. It loops with backoff until the context is canceled.
(ctx context.Context)
| 157 | // watchBeacons uses WatchKey to receive beacon updates in real-time and measure delay. |
| 158 | // It loops with backoff until the context is canceled. |
| 159 | func (t *PropagationDelayTracker) watchBeacons(ctx context.Context) { |
| 160 | boff := backoff.New(ctx, backoff.Config{ |
| 161 | MinBackoff: 100 * time.Millisecond, |
| 162 | MaxBackoff: 5 * time.Second, |
| 163 | MaxRetries: 0, // Retry indefinitely until context is canceled. |
| 164 | }) |
| 165 | |
| 166 | for boff.Ongoing() { |
| 167 | t.kv.WatchKey(ctx, propagationDelayTrackerKey, t.codec, func(val interface{}) bool { |
| 168 | if val == nil { |
| 169 | return true |
| 170 | } |
| 171 | |
| 172 | desc, ok := val.(*PropagationDelayTrackerDesc) |
| 173 | if !ok { |
| 174 | level.Warn(t.logger).Log("msg", "unexpected value type in watch callback", "type", fmt.Sprintf("%T", val)) |
| 175 | return true |
| 176 | } |
| 177 | |
| 178 | t.onBeaconsReceived(desc) |
| 179 | return true |
| 180 | }) |
| 181 | |
| 182 | // WatchKey exited, wait before retrying. |
| 183 | boff.Wait() |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | // onBeaconsReceived processes received beacons and records delay for unseen beacons. |
| 188 | func (t *PropagationDelayTracker) onBeaconsReceived(desc *PropagationDelayTrackerDesc) { |