MCPcopy
hub / github.com/IBM/sarama / handleError

Method handleError

consumer_group.go:745–774  ·  view source on GitHub ↗
(err error, topic string, partition int32)

Source from the content-addressed store, hash-verified

743}
744
745func (c *consumerGroup) handleError(err error, topic string, partition int32) {
746 var consumerError *ConsumerError
747 if ok := errors.As(err, &consumerError); !ok && topic != "" && partition > -1 {
748 err = &ConsumerError{
749 Topic: topic,
750 Partition: partition,
751 Err: err,
752 }
753 }
754
755 if !c.config.Consumer.Return.Errors {
756 Logger.Println(err)
757 return
758 }
759
760 c.errorsLock.RLock()
761 defer c.errorsLock.RUnlock()
762 select {
763 case <-c.closed:
764 // consumer is closed
765 return
766 default:
767 }
768
769 select {
770 case c.errors <- err:
771 default:
772 // no error listener
773 }
774}
775
776func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) {
777 if c.config.Metadata.RefreshFrequency == time.Duration(0) {

Callers 5

newConsumerGroupSessionFunction · 0.45
consumeMethod · 0.45
releaseMethod · 0.45
heartbeatLoopMethod · 0.45
newConsumerGroupClaimFunction · 0.45

Calls 1

PrintlnMethod · 0.65

Tested by

no test coverage detected