updateConsul updates our entries in consul, heartbeating and dealing with consul restarts.
(ctx context.Context)
| 948 | // updateConsul updates our entries in consul, heartbeating and dealing with |
| 949 | // consul restarts. |
| 950 | func (i *Lifecycler) updateConsul(ctx context.Context) error { |
| 951 | var ringDesc *Desc |
| 952 | |
| 953 | err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) { |
| 954 | ringDesc = GetOrCreateRingDesc(in) |
| 955 | |
| 956 | var tokens Tokens |
| 957 | instanceDesc, exists := ringDesc.Ingesters[i.ID] |
| 958 | |
| 959 | if !exists { |
| 960 | // If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work, |
| 961 | // the missing instance for some period of time could have cause a resharding of tenants among instances: |
| 962 | // to guarantee query correctness we need to update the registration timestamp to current time. |
| 963 | level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName) |
| 964 | i.setRegisteredAt(time.Now()) |
| 965 | tokens = i.getTokens() |
| 966 | } else { |
| 967 | tokens = instanceDesc.Tokens |
| 968 | } |
| 969 | |
| 970 | ro, rots := i.GetReadOnlyState() |
| 971 | ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots, nil) |
| 972 | return ringDesc, true, nil |
| 973 | }) |
| 974 | |
| 975 | // Update counters |
| 976 | if err == nil { |
| 977 | i.updateCounters(ringDesc) |
| 978 | } |
| 979 | |
| 980 | return err |
| 981 | } |
| 982 | |
| 983 | // changeState updates consul with state transitions for us. NB this must be |
| 984 | // called from loop()! Use ChangeState for calls from outside of loop(). |
no test coverage detected