MCPcopy
hub / github.com/segmentio/kafka-go / SyncGroup

Method SyncGroup

syncgroup.go:83–155  ·  view source on GitHub ↗

SyncGroup sends a sync group request to the coordinator and returns the response.

(ctx context.Context, req *SyncGroupRequest)

Source from the content-addressed store, hash-verified

81
82// SyncGroup sends a sync group request to the coordinator and returns the response.
83func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error) {
84 syncGroup := syncgroup.Request{
85 GroupID: req.GroupID,
86 GenerationID: int32(req.GenerationID),
87 MemberID: req.MemberID,
88 GroupInstanceID: req.GroupInstanceID,
89 ProtocolType: req.ProtocolType,
90 ProtocolName: req.ProtocolName,
91 Assignments: make([]syncgroup.RequestAssignment, 0, len(req.Assignments)),
92 }
93
94 for _, assignment := range req.Assignments {
95 assign := consumer.Assignment{
96 Version: consumer.MaxVersionSupported,
97 AssignedPartitions: make([]consumer.TopicPartition, 0, len(assignment.Assignment.AssignedPartitions)),
98 UserData: assignment.Assignment.UserData,
99 }
100
101 for topic, partitions := range assignment.Assignment.AssignedPartitions {
102 tp := consumer.TopicPartition{
103 Topic: topic,
104 Partitions: make([]int32, 0, len(partitions)),
105 }
106 for _, partition := range partitions {
107 tp.Partitions = append(tp.Partitions, int32(partition))
108 }
109 assign.AssignedPartitions = append(assign.AssignedPartitions, tp)
110 }
111
112 assignBytes, err := protocol.Marshal(consumer.MaxVersionSupported, assign)
113 if err != nil {
114 return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
115 }
116
117 syncGroup.Assignments = append(syncGroup.Assignments, syncgroup.RequestAssignment{
118 MemberID: assignment.MemberID,
119 Assignment: assignBytes,
120 })
121 }
122
123 m, err := c.roundTrip(ctx, req.Addr, &syncGroup)
124 if err != nil {
125 return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
126 }
127
128 r := m.(*syncgroup.Response)
129
130 var assignment consumer.Assignment
131 err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment)
132 if err != nil {
133 return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
134 }
135
136 res := &SyncGroupResponse{
137 Throttle: makeDuration(r.ThrottleTimeMS),
138 Error: makeError(r.ErrorCode, ""),
139 ProtocolType: r.ProtocolType,
140 ProtocolName: r.ProtocolName,

Callers 2

TestClientLeaveGroupFunction · 0.80
TestClientSyncGroupFunction · 0.80

Calls 5

roundTripMethod · 0.95
MarshalFunction · 0.92
UnmarshalFunction · 0.92
makeDurationFunction · 0.85
makeErrorFunction · 0.85

Tested by 2

TestClientLeaveGroupFunction · 0.64
TestClientSyncGroupFunction · 0.64