initRing is the first thing we do when we start. It: - adds an ingester entry to the ring - copies out our state and tokens if they exist
(ctx context.Context)
| 679 | // - adds an ingester entry to the ring |
| 680 | // - copies out our state and tokens if they exist |
| 681 | func (i *Lifecycler) initRing(ctx context.Context) error { |
| 682 | var ( |
| 683 | ringDesc *Desc |
| 684 | tokensFromFile Tokens |
| 685 | err error |
| 686 | ) |
| 687 | |
| 688 | if i.cfg.TokensFilePath != "" { |
| 689 | tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath) |
| 690 | if err != nil && !os.IsNotExist(err) { |
| 691 | level.Error(i.logger).Log("msg", "error loading tokens from file", "err", err) |
| 692 | } |
| 693 | } else { |
| 694 | level.Info(i.logger).Log("msg", "not loading tokens from file, tokens file path is empty") |
| 695 | } |
| 696 | |
| 697 | err = i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) { |
| 698 | ringDesc = GetOrCreateRingDesc(in) |
| 699 | |
| 700 | instanceDesc, ok := ringDesc.Ingesters[i.ID] |
| 701 | if !ok { |
| 702 | now := time.Now() |
| 703 | // The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now. |
| 704 | i.setRegisteredAt(now) |
| 705 | // Clear read-only state, and set last update time to "zero". |
| 706 | i.setReadOnlyState(false, time.Time{}) |
| 707 | |
| 708 | // We use the tokens from the file only if it does not exist in the ring yet. |
| 709 | if len(tokensFromFile) > 0 { |
| 710 | level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile)) |
| 711 | if len(tokensFromFile) >= i.cfg.NumTokens { |
| 712 | i.setState(ACTIVE) |
| 713 | } |
| 714 | ro, rots := i.GetReadOnlyState() |
| 715 | ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots, nil) |
| 716 | i.setTokens(tokensFromFile) |
| 717 | return ringDesc, true, nil |
| 718 | } |
| 719 | |
| 720 | // Either we are a new ingester, or consul must have restarted |
| 721 | level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName) |
| 722 | ro, rots := i.GetReadOnlyState() |
| 723 | ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots, nil) |
| 724 | return ringDesc, true, nil |
| 725 | } |
| 726 | |
| 727 | // The instance already exists in the ring, so we can't change the registered timestamp (even if it's zero) |
| 728 | // but we need to update the local state accordingly. |
| 729 | i.setRegisteredAt(instanceDesc.GetRegisteredAt()) |
| 730 | |
| 731 | // Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state. |
| 732 | i.setReadOnlyState(instanceDesc.GetReadOnlyState()) |
| 733 | |
| 734 | // If the ingester is in the JOINING state this means it crashed due to |
| 735 | // a failed token transfer or some other reason during startup. We want |
| 736 | // to set it back to PENDING in order to start the lifecycle from the |
| 737 | // beginning. |
| 738 | if instanceDesc.State == JOINING { |