(ctx context.Context)
| 309 | } |
| 310 | |
| 311 | func (t *PropagationDelayTracker) publishBeacon(ctx context.Context) { |
| 312 | // Generate a non-zero beacon ID. We need beaconID > 0 because 0 is used as |
| 313 | // the "no pending beacon" sentinel value in pendingBeaconID. |
| 314 | beaconID := rand.Uint64() |
| 315 | if beaconID == 0 { |
| 316 | beaconID = 1 |
| 317 | } |
| 318 | now := time.Now() |
| 319 | |
| 320 | // Set pendingBeaconID before marking as seen to prevent cleanupSeenBeacons |
| 321 | // from removing it during the CAS operation window. |
| 322 | t.pendingBeaconID.Store(beaconID) |
| 323 | defer t.pendingBeaconID.Store(0) |
| 324 | |
| 325 | // Mark as seen before publishing to avoid measuring our own beacon. |
| 326 | t.markAsSeen(beaconID) |
| 327 | |
| 328 | err := t.kv.CAS(ctx, propagationDelayTrackerKey, t.codec, func(in interface{}) (out interface{}, retry bool, err error) { |
| 329 | desc := GetOrCreatePropagationDelayTrackerDesc(in) |
| 330 | |
| 331 | // Skip publishing if we already have a beacon with the same ID (extremely rare beacon ID collision). |
| 332 | // In case of a collision, the beacon ID is tracked as seen anyway, but we don't care given it's a rare case. |
| 333 | if _, exists := desc.Beacons[beaconID]; exists { |
| 334 | return nil, false, nil |
| 335 | } |
| 336 | |
| 337 | desc.Beacons[beaconID] = BeaconDesc{ |
| 338 | PublishedAt: now.UnixMilli(), |
| 339 | PublishedBy: t.nodeName, |
| 340 | } |
| 341 | |
| 342 | return desc, true, nil |
| 343 | }) |
| 344 | |
| 345 | if err != nil { |
| 346 | level.Warn(t.logger).Log("msg", "failed to publish beacon", "err", err) |
| 347 | return |
| 348 | } |
| 349 | |
| 350 | t.beaconsPublishedTotal.Inc() |
| 351 | } |
| 352 | |
| 353 | // deleteBeacons marks the specified beacons as tombstones by setting their DeletedAt timestamp. |
| 354 | func (t *PropagationDelayTracker) deleteBeacons(ctx context.Context, beaconIDs []uint64) { |
no test coverage detected