(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession)
| 774 | } |
| 775 | |
| 776 | func (c *consumerGroup) loopCheckPartitionNumbers(allSubscribedTopicPartitions map[string][]int32, topics []string, session *consumerGroupSession) { |
| 777 | if c.config.Metadata.RefreshFrequency == time.Duration(0) { |
| 778 | return |
| 779 | } |
| 780 | |
| 781 | defer session.cancel(ErrSessionPartitionCountChanged) |
| 782 | |
| 783 | oldTopicToPartitionNum := make(map[string]int, len(allSubscribedTopicPartitions)) |
| 784 | for topic, partitions := range allSubscribedTopicPartitions { |
| 785 | oldTopicToPartitionNum[topic] = len(partitions) |
| 786 | } |
| 787 | |
| 788 | pause := time.NewTicker(c.config.Metadata.RefreshFrequency) |
| 789 | defer pause.Stop() |
| 790 | for { |
| 791 | if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil { |
| 792 | return |
| 793 | } else { |
| 794 | for topic, num := range oldTopicToPartitionNum { |
| 795 | if newTopicToPartitionNum[topic] != num { |
| 796 | Logger.Printf( |
| 797 | "consumergroup/%s loop check partition number goroutine find partitions in topics %s changed from %d to %d\n", |
| 798 | c.groupID, topics, num, newTopicToPartitionNum[topic]) |
| 799 | return // trigger the end of the session on exit |
| 800 | } |
| 801 | } |
| 802 | } |
| 803 | select { |
| 804 | case <-pause.C: |
| 805 | case <-session.ctx.Done(): |
| 806 | Logger.Printf( |
| 807 | "consumergroup/%s loop check partition number goroutine will exit, topics %s\n", |
| 808 | c.groupID, topics) |
| 809 | // if session closed by other, should be exited |
| 810 | return |
| 811 | case <-c.closed: |
| 812 | return |
| 813 | } |
| 814 | } |
| 815 | } |
| 816 | |
| 817 | func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) { |
| 818 | topicToPartitionNum := make(map[string]int, len(topics)) |
no test coverage detected