MCPcopy
hub / github.com/grafana/dskit / cleanUnhealthy

Method cleanUnhealthy

ring/client/pool.go:261–302  ·  view source on GitHub ↗

cleanUnhealthy loops through all servers and deletes any that fail a healthcheck. The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.

()

Source from the content-addressed store, hash-verified

259// cleanUnhealthy loops through all servers and deletes any that fail a healthcheck.
260// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.
261func (p *Pool) cleanUnhealthy() {
262 addresses := p.RegisteredAddresses()
263 _ = concurrency.ForEachJob(context.Background(), len(addresses), p.cfg.MaxConcurrentHealthChecks, func(ctx context.Context, idx int) error {
264 addr := addresses[idx]
265 member, ok := p.fromCache(addr)
266 // not ok means someone removed a client between the start of this loop and now
267 if !ok {
268 return nil
269 }
270
271 err := healthCheck(ctx, member.client, p.cfg.HealthCheckTimeout)
272 if err == nil {
273 member.firstFailedHealthCheck = time.Time{}
274 return nil
275 }
276
277 if member.firstFailedHealthCheck.IsZero() {
278 member.firstFailedHealthCheck = time.Now()
279 }
280
281 if time.Since(member.firstFailedHealthCheck) >= p.cfg.HealthCheckGracePeriod {
282 level.Warn(p.logger).Log(
283 "msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName),
284 "addr", addr,
285 "reason", err,
286 "first_failed_at", member.firstFailedHealthCheck,
287 )
288 p.RemoveClientFor(addr)
289 } else {
290 level.Debug(p.logger).Log(
291 "msg", fmt.Sprintf("%s failed healthcheck within grace period, not removing", p.clientName),
292 "addr", addr,
293 "reason", err,
294 "first_failed_at", member.firstFailedHealthCheck,
295 )
296 }
297
298 // Never return an error, because otherwise the processing would stop and
299 // remaining health checks would not be executed.
300 return nil
301 })
302}
303
304// healthCheck will check if the client is still healthy, returning an error if it is not
305func healthCheck(ctx context.Context, client PoolClient, timeout time.Duration) error {

Callers 4

iterationMethod · 0.95
TestCleanUnhealthyFunction · 0.95
TestRemoveClientFunction · 0.95

Calls 6

RegisteredAddressesMethod · 0.95
fromCacheMethod · 0.95
RemoveClientForMethod · 0.95
ForEachJobFunction · 0.92
healthCheckFunction · 0.85
LogMethod · 0.45

Tested by 3

TestCleanUnhealthyFunction · 0.76
TestRemoveClientFunction · 0.76