ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. For this method to work correctly (especially when using gossiping), source ingester (specified by ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and
(ctx context.Context, ingesterID string)
| 438 | // assign token to the wrong ingester. While we could check for that state here, when this method is called, |
| 439 | // transfers have already finished -- it's better to check for this *before* transfers start. |
| 440 | func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error { |
| 441 | errCh := make(chan error) |
| 442 | |
| 443 | fn := func() { |
| 444 | var tokens Tokens |
| 445 | |
| 446 | claimTokens := func(in interface{}) (out interface{}, retry bool, err error) { |
| 447 | ringDesc, ok := in.(*Desc) |
| 448 | if !ok || ringDesc == nil { |
| 449 | return nil, false, fmt.Errorf("cannot claim tokens in an empty ring") |
| 450 | } |
| 451 | |
| 452 | tokens = ringDesc.ClaimTokens(ingesterID, i.ID) |
| 453 | // update timestamp to give gossiping client a chance register ring change. |
| 454 | ing := ringDesc.Ingesters[i.ID] |
| 455 | ing.Timestamp = time.Now().Unix() |
| 456 | |
| 457 | // Tokens of the leaving ingester may have been generated by an older version which |
| 458 | // doesn't guarantee sorted tokens, so we enforce sorting here. |
| 459 | sort.Sort(tokens) |
| 460 | ing.Tokens = tokens |
| 461 | |
| 462 | ringDesc.Ingesters[i.ID] = ing |
| 463 | return ringDesc, true, nil |
| 464 | } |
| 465 | |
| 466 | if err := i.KVStore.CAS(ctx, i.RingKey, claimTokens); err != nil { |
| 467 | level.Error(i.logger).Log("msg", "Failed to write to the KV store", "ring", i.RingName, "err", err) |
| 468 | } |
| 469 | |
| 470 | i.setTokens(tokens) |
| 471 | errCh <- nil |
| 472 | } |
| 473 | |
| 474 | if err := i.sendToLifecyclerLoop(fn); err != nil { |
| 475 | return err |
| 476 | } |
| 477 | return <-errCh |
| 478 | } |
| 479 | |
| 480 | // HealthyInstancesCount returns the number of healthy instances for the Write operation |
| 481 | // in the ring, updated during the last heartbeat period. |
nothing calls this directly
no test coverage detected