JoinGroup sends a join group request to the coordinator and returns the response.
(ctx context.Context, req *JoinGroupRequest)
| 105 | |
| 106 | // JoinGroup sends a join group request to the coordinator and returns the response. |
| 107 | func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) { |
| 108 | joinGroup := joingroup.Request{ |
| 109 | GroupID: req.GroupID, |
| 110 | SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()), |
| 111 | RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()), |
| 112 | MemberID: req.MemberID, |
| 113 | GroupInstanceID: req.GroupInstanceID, |
| 114 | ProtocolType: req.ProtocolType, |
| 115 | Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)), |
| 116 | } |
| 117 | |
| 118 | for _, proto := range req.Protocols { |
| 119 | protoMeta := consumer.Subscription{ |
| 120 | Version: consumer.MaxVersionSupported, |
| 121 | Topics: proto.Metadata.Topics, |
| 122 | UserData: proto.Metadata.UserData, |
| 123 | OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)), |
| 124 | } |
| 125 | for topic, partitions := range proto.Metadata.OwnedPartitions { |
| 126 | tp := consumer.TopicPartition{ |
| 127 | Topic: topic, |
| 128 | Partitions: make([]int32, 0, len(partitions)), |
| 129 | } |
| 130 | for _, partition := range partitions { |
| 131 | tp.Partitions = append(tp.Partitions, int32(partition)) |
| 132 | } |
| 133 | protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp) |
| 134 | } |
| 135 | |
| 136 | metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta) |
| 137 | if err != nil { |
| 138 | return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err) |
| 139 | } |
| 140 | |
| 141 | joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{ |
| 142 | Name: proto.Name, |
| 143 | Metadata: metaBytes, |
| 144 | }) |
| 145 | } |
| 146 | |
| 147 | m, err := c.roundTrip(ctx, req.Addr, &joinGroup) |
| 148 | if err != nil { |
| 149 | return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err) |
| 150 | } |
| 151 | |
| 152 | r := m.(*joingroup.Response) |
| 153 | |
| 154 | res := &JoinGroupResponse{ |
| 155 | Error: makeError(r.ErrorCode, ""), |
| 156 | Throttle: makeDuration(r.ThrottleTimeMS), |
| 157 | GenerationID: int(r.GenerationID), |
| 158 | ProtocolName: r.ProtocolName, |
| 159 | ProtocolType: r.ProtocolType, |
| 160 | LeaderID: r.LeaderID, |
| 161 | MemberID: r.MemberID, |
| 162 | Members: make([]JoinGroupResponseMember, 0, len(r.Members)), |
| 163 | } |
| 164 |