registerInstance registers the instance in the ring. The initial state and set of tokens depends on the OnRingInstanceRegister() delegate function.
(ctx context.Context)
| 311 | // registerInstance registers the instance in the ring. The initial state and set of tokens |
| 312 | // depends on the OnRingInstanceRegister() delegate function. |
| 313 | func (l *BasicLifecycler) registerInstance(ctx context.Context) error { |
| 314 | var instanceDesc InstanceDesc |
| 315 | |
| 316 | err := l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) { |
| 317 | ringDesc := GetOrCreateRingDesc(in) |
| 318 | |
| 319 | var exists bool |
| 320 | instanceDesc, exists = ringDesc.Ingesters[l.cfg.ID] |
| 321 | if exists { |
| 322 | level.Info(l.logger).Log("msg", "instance found in the ring", "instance", l.cfg.ID, "ring", l.ringName, "state", instanceDesc.GetState(), "tokens", len(instanceDesc.GetTokens()), "registered_at", instanceDesc.GetRegisteredAt().String(), "last_heartbeat_at", instanceDesc.GetLastHeartbeatAt().String()) |
| 323 | } else { |
| 324 | level.Info(l.logger).Log("msg", "instance not found in the ring", "instance", l.cfg.ID, "ring", l.ringName) |
| 325 | } |
| 326 | |
| 327 | // We call the delegate to get the desired state right after the initialization. |
| 328 | state, tokens := l.delegate.OnRingInstanceRegister(l, *ringDesc, exists, l.cfg.ID, instanceDesc) |
| 329 | |
| 330 | // Ensure tokens are sorted. |
| 331 | sort.Sort(tokens) |
| 332 | |
| 333 | // If the instance didn't already exist, then we can safely set the registered timestamp to "now", |
| 334 | // otherwise we have to honor the previous value (even if it was zero, because means it was unknown |
| 335 | // but it's definitely not "now"). |
| 336 | var registeredAt time.Time |
| 337 | if exists { |
| 338 | registeredAt = instanceDesc.GetRegisteredAt() |
| 339 | } else { |
| 340 | registeredAt = time.Now() |
| 341 | } |
| 342 | |
| 343 | readOnly, readOnlyUpdatedTimestamp := instanceDesc.GetReadOnlyState() |
| 344 | if readOnlyUpdatedTimestamp.IsZero() { |
| 345 | // Store the zero timestamp as 0 in the ring, not as -62135596800. |
| 346 | readOnlyUpdatedTimestamp = time.Unix(0, 0) |
| 347 | } |
| 348 | // Always overwrite the instance in the ring (even if already exists) because some properties |
| 349 | // may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at |
| 350 | // least did. |
| 351 | instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt, readOnly, readOnlyUpdatedTimestamp, l.cfg.Versions) |
| 352 | return ringDesc, true, nil |
| 353 | }) |
| 354 | |
| 355 | if err != nil { |
| 356 | return err |
| 357 | } |
| 358 | |
| 359 | l.currState.Lock() |
| 360 | l.currInstanceDesc = &instanceDesc |
| 361 | l.currState.Unlock() |
| 362 | |
| 363 | // Initialize the read-only metric to reflect the current state after registration. |
| 364 | if instanceDesc.ReadOnly { |
| 365 | l.metrics.readOnly.Set(1) |
| 366 | } else { |
| 367 | l.metrics.readOnly.Set(0) |
| 368 | } |
| 369 | |
| 370 | return nil |
no test coverage detected