(coordinator *Broker, memberID string, generationID int32)
| 625 | } |
| 626 | |
| 627 | func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) { |
| 628 | req := &HeartbeatRequest{ |
| 629 | GroupId: c.groupID, |
| 630 | MemberId: memberID, |
| 631 | GenerationId: generationID, |
| 632 | } |
| 633 | |
| 634 | // Version 1 and version 2 are the same as version 0. |
| 635 | if c.config.Version.IsAtLeast(V0_11_0_0) { |
| 636 | req.Version = 1 |
| 637 | } |
| 638 | if c.config.Version.IsAtLeast(V2_0_0_0) { |
| 639 | req.Version = 2 |
| 640 | } |
| 641 | // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts. |
| 642 | if c.config.Version.IsAtLeast(V2_3_0_0) { |
| 643 | req.Version = 3 |
| 644 | req.GroupInstanceId = c.groupInstanceId |
| 645 | // Version 4 is the first flexible version |
| 646 | if c.config.Version.IsAtLeast(V2_4_0_0) { |
| 647 | req.Version = 4 |
| 648 | } |
| 649 | } |
| 650 | |
| 651 | return coordinator.Heartbeat(req) |
| 652 | } |
| 653 | |
| 654 | func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (map[string][]int32, []string, BalanceStrategyPlan, error) { |
| 655 | topicPartitions := make(map[string][]int32) |
no test coverage detected