(err error, topic string, partition int32)
| 743 | } |
| 744 | |
| 745 | func (c *consumerGroup) handleError(err error, topic string, partition int32) { |
| 746 | var consumerError *ConsumerError |
| 747 | if ok := errors.As(err, &consumerError); !ok && topic != "" && partition > -1 { |
| 748 | err = &ConsumerError{ |
| 749 | Topic: topic, |
| 750 | Partition: partition, |
| 751 | Err: err, |
| 752 | } |
| 753 | } |
| 754 | |
| 755 | if !c.config.Consumer.Return.Errors { |
| 756 | Logger.Println(err) |
| 757 | return |
| 758 | } |
| 759 | |
| 760 | c.errorsLock.RLock() |
| 761 | defer c.errorsLock.RUnlock() |
| 762 | select { |
| 763 | case <-c.closed: |
| 764 | // consumer is closed |
| 765 | return |
| 766 | default: |
| 767 | } |
| 768 | |
| 769 | select { |
| 770 | case c.errors <- err: |
| 771 | default: |
| 772 | // no error listener |
| 773 | } |
| 774 | } |
| 775 | |
| 776 | func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) { |
| 777 | if c.config.Metadata.RefreshFrequency == time.Duration(0) { |
no test coverage detected