MCPcopy
hub / github.com/IBM/sarama / loopCheckPartitionNumbers

Method loopCheckPartitionNumbers

consumer_group.go:776–815  ·  view source on GitHub ↗
(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession)

Source from the content-addressed store, hash-verified

774}
775
776func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) {
777 if c.config.Metadata.RefreshFrequency == time.Duration(0) {
778 return
779 }
780
781 defer session.cancel(ErrSessionPartitionCountChanged)
782
783 oldTopicToPartitionNum := make(map[string]int, len(allSubscribedTopicPartitions))
784 for topic, partitions := range allSubscribedTopicPartitions {
785 oldTopicToPartitionNum[topic] = len(partitions)
786 }
787
788 pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
789 defer pause.Stop()
790 for {
791 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
792 return
793 } else {
794 for topic, num := range oldTopicToPartitionNum {
795 if newTopicToPartitionNum[topic] != num {
796 Logger.Printf(
797 "consumergroup/%s loop check partition number goroutine find partitions in topics %s changed from %d to %d\n",
798 c.groupID, topics, num, newTopicToPartitionNum[topic])
799 return // trigger the end of the session on exit
800 }
801 }
802 }
803 select {
804 case <-pause.C:
805 case <-session.ctx.Done():
806 Logger.Printf(
807 "consumergroup/%s loop check partition number goroutine will exit, topics %s\n",
808 c.groupID, topics)
809 // if session closed by other, should be exited
810 return
811 case <-c.closed:
812 return
813 }
814 }
815}
816
817func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
818 topicToPartitionNum := make(map[string]int, len(topics))

Callers 1

newSessionMethod · 0.95

Calls 4

StopMethod · 0.80
PrintfMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected