cleanUnhealthy loops through all servers and deletes any that fail a healthcheck. The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.
()
| 259 | // cleanUnhealthy loops through all servers and deletes any that fail a healthcheck. |
| 260 | // The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks. |
| 261 | func (p *Pool) cleanUnhealthy() { |
| 262 | addresses := p.RegisteredAddresses() |
| 263 | _ = concurrency.ForEachJob(context.Background(), len(addresses), p.cfg.MaxConcurrentHealthChecks, func(ctx context.Context, idx int) error { |
| 264 | addr := addresses[idx] |
| 265 | member, ok := p.fromCache(addr) |
| 266 | // not ok means someone removed a client between the start of this loop and now |
| 267 | if !ok { |
| 268 | return nil |
| 269 | } |
| 270 | |
| 271 | err := healthCheck(ctx, member.client, p.cfg.HealthCheckTimeout) |
| 272 | if err == nil { |
| 273 | member.firstFailedHealthCheck = time.Time{} |
| 274 | return nil |
| 275 | } |
| 276 | |
| 277 | if member.firstFailedHealthCheck.IsZero() { |
| 278 | member.firstFailedHealthCheck = time.Now() |
| 279 | } |
| 280 | |
| 281 | if time.Since(member.firstFailedHealthCheck) >= p.cfg.HealthCheckGracePeriod { |
| 282 | level.Warn(p.logger).Log( |
| 283 | "msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName), |
| 284 | "addr", addr, |
| 285 | "reason", err, |
| 286 | "first_failed_at", member.firstFailedHealthCheck, |
| 287 | ) |
| 288 | p.RemoveClientFor(addr) |
| 289 | } else { |
| 290 | level.Debug(p.logger).Log( |
| 291 | "msg", fmt.Sprintf("%s failed healthcheck within grace period, not removing", p.clientName), |
| 292 | "addr", addr, |
| 293 | "reason", err, |
| 294 | "first_failed_at", member.firstFailedHealthCheck, |
| 295 | ) |
| 296 | } |
| 297 | |
| 298 | // Never return an error, because otherwise the processing would stop and |
| 299 | // remaining health checks would not be executed. |
| 300 | return nil |
| 301 | }) |
| 302 | } |
| 303 | |
| 304 | // healthCheck will check if the client is still healthy, returning an error if it is not |
| 305 | func healthCheck(ctx context.Context, client PoolClient, timeout time.Duration) error { |