(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata)
| 652 | } |
| 653 | |
| 654 | func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (map[string][]int32, []string, BalanceStrategyPlan, error) { |
| 655 | topicPartitions := make(map[string][]int32) |
| 656 | for _, meta := range members { |
| 657 | for _, topic := range meta.Topics { |
| 658 | topicPartitions[topic] = nil |
| 659 | } |
| 660 | } |
| 661 | |
| 662 | allSubscribedTopics := make([]string, 0, len(topicPartitions)) |
| 663 | for topic := range topicPartitions { |
| 664 | allSubscribedTopics = append(allSubscribedTopics, topic) |
| 665 | } |
| 666 | |
| 667 | // refresh metadata for all the subscribed topics in the consumer group |
| 668 | // to avoid using stale metadata to assigning partitions |
| 669 | err := c.client.RefreshMetadata(allSubscribedTopics...) |
| 670 | if err != nil { |
| 671 | return nil, nil, nil, err |
| 672 | } |
| 673 | |
| 674 | for topic := range topicPartitions { |
| 675 | partitions, err := c.client.Partitions(topic) |
| 676 | if err != nil { |
| 677 | return nil, nil, nil, err |
| 678 | } |
| 679 | topicPartitions[topic] = partitions |
| 680 | } |
| 681 | |
| 682 | plan, err := strategy.Plan(members, topicPartitions) |
| 683 | return topicPartitions, allSubscribedTopics, plan, err |
| 684 | } |
| 685 | |
| 686 | // Leaves the cluster, called by Close. |
| 687 | func (c *consumerGroup) leave() error { |
no test coverage detected