MCPcopy
hub / github.com/IBM/sarama / dispatcher

Method dispatcher

consumer.go:536–586  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

534}
535
536func (child *partitionConsumer) dispatcher() {
537 defer func() {
538 if child.broker != nil {
539 child.consumer.unrefBrokerConsumer(child.broker)
540 }
541 child.consumer.removeChild(child)
542 close(child.feeder)
543 }()
544
545 var backoff <-chan time.Time
546 for {
547 select {
548 case <-child.dispatcherStop:
549 return
550 case <-child.dying:
551 child.waitForBrokerHandover()
552 return
553 case <-child.trigger:
554 if max := child.conf.Consumer.Retry.Max; max > 0 && int(child.retries.Load()) >= max {
555 Logger.Printf("consumer/%s/%d giving up after %d consecutive failures\n",
556 child.topic, child.partition, child.retries.Load())
557 child.sendError(ErrConsumerRetriesExhausted)
558 child.AsyncClose()
559 child.waitForBrokerHandover()
560 return
561 }
562 // only set the timer when none is pending, so retries increments
563 // once per dispatch attempt rather than once per trigger
564 if backoff == nil {
565 backoff = time.After(child.computeBackoff())
566 }
567 case <-backoff:
568 backoff = nil
569 if child.broker != nil {
570 child.consumer.unrefBrokerConsumer(child.broker)
571 child.broker = nil
572 }
573 if err := child.dispatch(); err != nil {
574 select {
575 case <-child.dispatcherStop:
576 return
577 case <-child.dying:
578 return
579 default:
580 child.sendError(err)
581 child.triggerRedispatch()
582 }
583 }
584 }
585 }
586}
587
588// waitForBrokerHandover blocks until the brokerConsumer releases the current
589// subscription, so the deferred close(feeder) cannot race an in-flight

Calls 9

waitForBrokerHandoverMethod · 0.95
sendErrorMethod · 0.95
AsyncCloseMethod · 0.95
computeBackoffMethod · 0.95
dispatchMethod · 0.95
triggerRedispatchMethod · 0.95
unrefBrokerConsumerMethod · 0.80
removeChildMethod · 0.80
PrintfMethod · 0.65