| 281 | } |
| 282 | |
| 283 | func (i *Lifecycler) checkRingHealthForReadiness(ctx context.Context) error { |
| 284 | // Ensure the instance holds some tokens. |
| 285 | if len(i.getTokens()) == 0 { |
| 286 | return fmt.Errorf("this instance owns no tokens") |
| 287 | } |
| 288 | |
| 289 | // If ring health checking is enabled we make sure all instances in the ring are ACTIVE and healthy, |
| 290 | // otherwise we just check this instance. |
| 291 | desc, err := i.KVStore.Get(ctx, i.RingKey) |
| 292 | if err != nil { |
| 293 | level.Error(i.logger).Log("msg", "error talking to the KV store", "ring", i.RingName, "err", err) |
| 294 | return fmt.Errorf("error talking to the KV store: %s", err) |
| 295 | } |
| 296 | |
| 297 | ringDesc, ok := desc.(*Desc) |
| 298 | if !ok || ringDesc == nil { |
| 299 | return fmt.Errorf("no ring returned from the KV store") |
| 300 | } |
| 301 | |
| 302 | if i.cfg.ReadinessCheckRingHealth { |
| 303 | if err := ringDesc.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil { |
| 304 | level.Warn(i.logger).Log("msg", "found an existing instance(s) with a problem in the ring, "+ |
| 305 | "this instance cannot become ready until this problem is resolved. "+ |
| 306 | "The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.", |
| 307 | "ring", i.RingName, "err", err) |
| 308 | return err |
| 309 | } |
| 310 | } else { |
| 311 | instance, ok := ringDesc.Ingesters[i.ID] |
| 312 | if !ok { |
| 313 | return fmt.Errorf("instance %s not found in the ring", i.ID) |
| 314 | } |
| 315 | |
| 316 | if err := instance.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil { |
| 317 | return err |
| 318 | } |
| 319 | } |
| 320 | |
| 321 | return nil |
| 322 | } |
| 323 | |
| 324 | // GetState returns the state of this ingester. |
| 325 | func (i *Lifecycler) GetState() InstanceState { |