| 534 | } |
| 535 | |
| 536 | func (child *partitionConsumer) dispatcher() { |
| 537 | defer func() { |
| 538 | if child.broker != nil { |
| 539 | child.consumer.unrefBrokerConsumer(child.broker) |
| 540 | } |
| 541 | child.consumer.removeChild(child) |
| 542 | close(child.feeder) |
| 543 | }() |
| 544 | |
| 545 | var backoff <-chan time.Time |
| 546 | for { |
| 547 | select { |
| 548 | case <-child.dispatcherStop: |
| 549 | return |
| 550 | case <-child.dying: |
| 551 | child.waitForBrokerHandover() |
| 552 | return |
| 553 | case <-child.trigger: |
| 554 | if max := child.conf.Consumer.Retry.Max; max > 0 && int(child.retries.Load()) >= max { |
| 555 | Logger.Printf("consumer/%s/%d giving up after %d consecutive failures\n", |
| 556 | child.topic, child.partition, child.retries.Load()) |
| 557 | child.sendError(ErrConsumerRetriesExhausted) |
| 558 | child.AsyncClose() |
| 559 | child.waitForBrokerHandover() |
| 560 | return |
| 561 | } |
| 562 | // only set the timer when none is pending, so retries increments |
| 563 | // once per dispatch attempt rather than once per trigger |
| 564 | if backoff == nil { |
| 565 | backoff = time.After(child.computeBackoff()) |
| 566 | } |
| 567 | case <-backoff: |
| 568 | backoff = nil |
| 569 | if child.broker != nil { |
| 570 | child.consumer.unrefBrokerConsumer(child.broker) |
| 571 | child.broker = nil |
| 572 | } |
| 573 | if err := child.dispatch(); err != nil { |
| 574 | select { |
| 575 | case <-child.dispatcherStop: |
| 576 | return |
| 577 | case <-child.dying: |
| 578 | return |
| 579 | default: |
| 580 | child.sendError(err) |
| 581 | child.triggerRedispatch() |
| 582 | } |
| 583 | } |
| 584 | } |
| 585 | } |
| 586 | } |
| 587 | |
| 588 | // waitForBrokerHandover blocks until the brokerConsumer releases the current |
| 589 | // subscription, so the deferred close(feeder) cannot race an in-flight |