MCPcopy
hub / github.com/segmentio/kafka-go / JoinGroup

Method JoinGroup

joingroup.go:107–190  ·  view source on GitHub ↗

JoinGroup sends a join group request to the coordinator and returns the response.

(ctx context.Context, req *JoinGroupRequest)

Source from the content-addressed store, hash-verified

105
106// JoinGroup sends a join group request to the coordinator and returns the response.
107func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) {
108 joinGroup := joingroup.Request{
109 GroupID: req.GroupID,
110 SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()),
111 RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()),
112 MemberID: req.MemberID,
113 GroupInstanceID: req.GroupInstanceID,
114 ProtocolType: req.ProtocolType,
115 Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)),
116 }
117
118 for _, proto := range req.Protocols {
119 protoMeta := consumer.Subscription{
120 Version: consumer.MaxVersionSupported,
121 Topics: proto.Metadata.Topics,
122 UserData: proto.Metadata.UserData,
123 OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)),
124 }
125 for topic, partitions := range proto.Metadata.OwnedPartitions {
126 tp := consumer.TopicPartition{
127 Topic: topic,
128 Partitions: make([]int32, 0, len(partitions)),
129 }
130 for _, partition := range partitions {
131 tp.Partitions = append(tp.Partitions, int32(partition))
132 }
133 protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp)
134 }
135
136 metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta)
137 if err != nil {
138 return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
139 }
140
141 joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{
142 Name: proto.Name,
143 Metadata: metaBytes,
144 })
145 }
146
147 m, err := c.roundTrip(ctx, req.Addr, &joinGroup)
148 if err != nil {
149 return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
150 }
151
152 r := m.(*joingroup.Response)
153
154 res := &JoinGroupResponse{
155 Error: makeError(r.ErrorCode, ""),
156 Throttle: makeDuration(r.ThrottleTimeMS),
157 GenerationID: int(r.GenerationID),
158 ProtocolName: r.ProtocolName,
159 ProtocolType: r.ProtocolType,
160 LeaderID: r.LeaderID,
161 MemberID: r.MemberID,
162 Members: make([]JoinGroupResponseMember, 0, len(r.Members)),
163 }
164

Callers 3

TestClientJoinGroupFunction · 0.80
TestClientLeaveGroupFunction · 0.80
TestClientSyncGroupFunction · 0.80

Calls 5

roundTripMethod · 0.95
MarshalFunction · 0.92
UnmarshalFunction · 0.92
makeErrorFunction · 0.85
makeDurationFunction · 0.85

Tested by 3

TestClientJoinGroupFunction · 0.64
TestClientLeaveGroupFunction · 0.64
TestClientSyncGroupFunction · 0.64