()
| 290 | } |
| 291 | |
| 292 | func (p *Server) acquireLoop() { |
| 293 | defer p.opts.Logger.Debug(p.closeContext, "acquire loop exited") |
| 294 | defer p.wg.Done() |
| 295 | defer func() { close(p.acquireDoneCh) }() |
| 296 | ctx := p.closeContext |
| 297 | for retrier := retry.New(10*time.Millisecond, 1*time.Second); retrier.Wait(ctx); { |
| 298 | if p.acquireExit() { |
| 299 | return |
| 300 | } |
| 301 | client, ok := p.client() |
| 302 | if !ok { |
| 303 | p.opts.Logger.Debug(ctx, "shut down before client (re) connected") |
| 304 | return |
| 305 | } |
| 306 | err := p.acquireAndRunOne(client) |
| 307 | if err != nil && ctx.Err() == nil { // Only log if context is not done. |
| 308 | // Short-circuit: don't wait for the retry delay to exit, if required. |
| 309 | if p.acquireExit() { |
| 310 | return |
| 311 | } |
| 312 | p.opts.Logger.Warn(ctx, "failed to acquire job, retrying", slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())), slog.Error(err)) |
| 313 | } else { |
| 314 | // Reset the retrier after each successful acquisition. |
| 315 | retrier.Reset() |
| 316 | } |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | // acquireExit returns true if the acquire loop should exit |
| 321 | func (p *Server) acquireExit() bool { |
no test coverage detected