| 522 | } |
| 523 | |
| 524 | func (child *partitionConsumer) computeBackoff() time.Duration { |
| 525 | retries := child.retries.Add(1) |
| 526 | if retries >= stuckRetryThreshold && retries%stuckRetryThreshold == 0 { |
| 527 | Logger.Printf("consumer/%s/%d still retrying after %d consecutive failures\n", |
| 528 | child.topic, child.partition, retries) |
| 529 | } |
| 530 | if child.conf.Consumer.Retry.BackoffFunc != nil { |
| 531 | return child.conf.Consumer.Retry.BackoffFunc(int(retries)) |
| 532 | } |
| 533 | return child.conf.Consumer.Retry.Backoff |
| 534 | } |
| 535 | |
| 536 | func (child *partitionConsumer) dispatcher() { |
| 537 | defer func() { |