SyncGroup sends a sync group request to the coordinator and returns the response.
(ctx context.Context, req *SyncGroupRequest)
| 81 | |
| 82 | // SyncGroup sends a sync group request to the coordinator and returns the response. |
| 83 | func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error) { |
| 84 | syncGroup := syncgroup.Request{ |
| 85 | GroupID: req.GroupID, |
| 86 | GenerationID: int32(req.GenerationID), |
| 87 | MemberID: req.MemberID, |
| 88 | GroupInstanceID: req.GroupInstanceID, |
| 89 | ProtocolType: req.ProtocolType, |
| 90 | ProtocolName: req.ProtocolName, |
| 91 | Assignments: make([]syncgroup.RequestAssignment, 0, len(req.Assignments)), |
| 92 | } |
| 93 | |
| 94 | for _, assignment := range req.Assignments { |
| 95 | assign := consumer.Assignment{ |
| 96 | Version: consumer.MaxVersionSupported, |
| 97 | AssignedPartitions: make([]consumer.TopicPartition, 0, len(assignment.Assignment.AssignedPartitions)), |
| 98 | UserData: assignment.Assignment.UserData, |
| 99 | } |
| 100 | |
| 101 | for topic, partitions := range assignment.Assignment.AssignedPartitions { |
| 102 | tp := consumer.TopicPartition{ |
| 103 | Topic: topic, |
| 104 | Partitions: make([]int32, 0, len(partitions)), |
| 105 | } |
| 106 | for _, partition := range partitions { |
| 107 | tp.Partitions = append(tp.Partitions, int32(partition)) |
| 108 | } |
| 109 | assign.AssignedPartitions = append(assign.AssignedPartitions, tp) |
| 110 | } |
| 111 | |
| 112 | assignBytes, err := protocol.Marshal(consumer.MaxVersionSupported, assign) |
| 113 | if err != nil { |
| 114 | return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) |
| 115 | } |
| 116 | |
| 117 | syncGroup.Assignments = append(syncGroup.Assignments, syncgroup.RequestAssignment{ |
| 118 | MemberID: assignment.MemberID, |
| 119 | Assignment: assignBytes, |
| 120 | }) |
| 121 | } |
| 122 | |
| 123 | m, err := c.roundTrip(ctx, req.Addr, &syncGroup) |
| 124 | if err != nil { |
| 125 | return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) |
| 126 | } |
| 127 | |
| 128 | r := m.(*syncgroup.Response) |
| 129 | |
| 130 | var assignment consumer.Assignment |
| 131 | err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment) |
| 132 | if err != nil { |
| 133 | return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) |
| 134 | } |
| 135 | |
| 136 | res := &SyncGroupResponse{ |
| 137 | Throttle: makeDuration(r.ThrottleTimeMS), |
| 138 | Error: makeError(r.ErrorCode, ""), |
| 139 | ProtocolType: r.ProtocolType, |
| 140 | ProtocolName: r.ProtocolName, |