MCPcopy
hub / github.com/IBM/sarama / balance

Method balance

consumer_group.go:654–684  ·  view source on GitHub ↗
(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata)

Source from the content-addressed store, hash-verified

652}
653
654func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (map[string][]int32, []string, BalanceStrategyPlan, error) {
655 topicPartitions := make(map[string][]int32)
656 for _, meta := range members {
657 for _, topic := range meta.Topics {
658 topicPartitions[topic] = nil
659 }
660 }
661
662 allSubscribedTopics := make([]string, 0, len(topicPartitions))
663 for topic := range topicPartitions {
664 allSubscribedTopics = append(allSubscribedTopics, topic)
665 }
666
667 // refresh metadata for all the subscribed topics in the consumer group
668 // to avoid using stale metadata to assigning partitions
669 err := c.client.RefreshMetadata(allSubscribedTopics...)
670 if err != nil {
671 return nil, nil, nil, err
672 }
673
674 for topic := range topicPartitions {
675 partitions, err := c.client.Partitions(topic)
676 if err != nil {
677 return nil, nil, nil, err
678 }
679 topicPartitions[topic] = partitions
680 }
681
682 plan, err := strategy.Plan(members, topicPartitions)
683 return topicPartitions, allSubscribedTopics, plan, err
684}
685
686// Leaves the cluster, called by Close.
687func (c *consumerGroup) leave() error {

Callers 1

newSessionMethod · 0.95

Calls 3

RefreshMetadataMethod · 0.65
PartitionsMethod · 0.65
PlanMethod · 0.65

Tested by

no test coverage detected