makeJoinGroupRequestV1 handles the logic of constructing a joinGroup request.
(memberID string)
| 979 | // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup |
| 980 | // request. |
| 981 | func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) { |
| 982 | request := joinGroupRequest{ |
| 983 | GroupID: cg.config.ID, |
| 984 | MemberID: memberID, |
| 985 | SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), |
| 986 | RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond), |
| 987 | ProtocolType: defaultProtocolType, |
| 988 | } |
| 989 | |
| 990 | for _, balancer := range cg.config.GroupBalancers { |
| 991 | userData, err := balancer.UserData() |
| 992 | if err != nil { |
| 993 | return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) |
| 994 | } |
| 995 | request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ |
| 996 | ProtocolName: balancer.ProtocolName(), |
| 997 | ProtocolMetadata: groupMetadata{ |
| 998 | Version: 1, |
| 999 | Topics: cg.config.Topics, |
| 1000 | UserData: userData, |
| 1001 | }.bytes(), |
| 1002 | }) |
| 1003 | } |
| 1004 | |
| 1005 | return request, nil |
| 1006 | } |
| 1007 | |
| 1008 | // assignTopicPartitions uses the selected GroupBalancer to assign members to |
| 1009 | // their various partitions. |
no test coverage detected