OffsetCommit sends an offset commit request to a kafka broker and returns the response.
(ctx context.Context, req *OffsetCommitRequest)
| 75 | // OffsetCommit sends an offset commit request to a kafka broker and returns the |
| 76 | // response. |
| 77 | func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error) { |
| 78 | now := time.Now().UnixNano() / int64(time.Millisecond) |
| 79 | topics := make([]offsetcommit.RequestTopic, 0, len(req.Topics)) |
| 80 | |
| 81 | for topicName, commits := range req.Topics { |
| 82 | partitions := make([]offsetcommit.RequestPartition, len(commits)) |
| 83 | |
| 84 | for i, c := range commits { |
| 85 | partitions[i] = offsetcommit.RequestPartition{ |
| 86 | PartitionIndex: int32(c.Partition), |
| 87 | CommittedOffset: c.Offset, |
| 88 | CommittedMetadata: c.Metadata, |
| 89 | // This field existed in v1 of the OffsetCommit API, setting it |
| 90 | // to the current timestamp is probably a safe thing to do, but |
| 91 | // it is hard to tell. |
| 92 | CommitTimestamp: now, |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | topics = append(topics, offsetcommit.RequestTopic{ |
| 97 | Name: topicName, |
| 98 | Partitions: partitions, |
| 99 | }) |
| 100 | } |
| 101 | |
| 102 | m, err := c.roundTrip(ctx, req.Addr, &offsetcommit.Request{ |
| 103 | GroupID: req.GroupID, |
| 104 | GenerationID: int32(req.GenerationID), |
| 105 | MemberID: req.MemberID, |
| 106 | GroupInstanceID: req.InstanceID, |
| 107 | Topics: topics, |
| 108 | // Hardcoded retention; this field existed between v2 and v4 of the |
| 109 | // OffsetCommit API, we would have to figure out a way to give the |
| 110 | // client control over the API version being used to support configuring |
| 111 | // it in the request object. |
| 112 | RetentionTimeMs: int64((24 * time.Hour) / time.Millisecond), |
| 113 | }) |
| 114 | if err != nil { |
| 115 | return nil, fmt.Errorf("kafka.(*Client).OffsetCommit: %w", err) |
| 116 | } |
| 117 | r := m.(*offsetcommit.Response) |
| 118 | |
| 119 | res := &OffsetCommitResponse{ |
| 120 | Throttle: makeDuration(r.ThrottleTimeMs), |
| 121 | Topics: make(map[string][]OffsetCommitPartition, len(r.Topics)), |
| 122 | } |
| 123 | |
| 124 | for _, topic := range r.Topics { |
| 125 | partitions := make([]OffsetCommitPartition, len(topic.Partitions)) |
| 126 | |
| 127 | for i, p := range topic.Partitions { |
| 128 | partitions[i] = OffsetCommitPartition{ |
| 129 | Partition: int(p.PartitionIndex), |
| 130 | Error: makeError(p.ErrorCode, ""), |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | res.Topics[topic.Name] = partitions |