MCPcopy
hub / github.com/segmentio/kafka-go / OffsetCommit

Method OffsetCommit

offsetcommit.go:77–138  ·  view source on GitHub ↗

OffsetCommit sends an offset commit request to a kafka broker and returns the response.

(ctx context.Context, req *OffsetCommitRequest)

Source from the content-addressed store, hash-verified

75// OffsetCommit sends an offset commit request to a kafka broker and returns the
76// response.
77func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error) {
78 now := time.Now().UnixNano() / int64(time.Millisecond)
79 topics := make([]offsetcommit.RequestTopic, 0, len(req.Topics))
80
81 for topicName, commits := range req.Topics {
82 partitions := make([]offsetcommit.RequestPartition, len(commits))
83
84 for i, c := range commits {
85 partitions[i] = offsetcommit.RequestPartition{
86 PartitionIndex: int32(c.Partition),
87 CommittedOffset: c.Offset,
88 CommittedMetadata: c.Metadata,
89 // This field existed in v1 of the OffsetCommit API, setting it
90 // to the current timestamp is probably a safe thing to do, but
91 // it is hard to tell.
92 CommitTimestamp: now,
93 }
94 }
95
96 topics = append(topics, offsetcommit.RequestTopic{
97 Name: topicName,
98 Partitions: partitions,
99 })
100 }
101
102 m, err := c.roundTrip(ctx, req.Addr, &offsetcommit.Request{
103 GroupID: req.GroupID,
104 GenerationID: int32(req.GenerationID),
105 MemberID: req.MemberID,
106 GroupInstanceID: req.InstanceID,
107 Topics: topics,
108 // Hardcoded retention; this field existed between v2 and v4 of the
109 // OffsetCommit API, we would have to figure out a way to give the
110 // client control over the API version being used to support configuring
111 // it in the request object.
112 RetentionTimeMs: int64((24 * time.Hour) / time.Millisecond),
113 })
114 if err != nil {
115 return nil, fmt.Errorf("kafka.(*Client).OffsetCommit: %w", err)
116 }
117 r := m.(*offsetcommit.Response)
118
119 res := &OffsetCommitResponse{
120 Throttle: makeDuration(r.ThrottleTimeMs),
121 Topics: make(map[string][]OffsetCommitPartition, len(r.Topics)),
122 }
123
124 for _, topic := range r.Topics {
125 partitions := make([]OffsetCommitPartition, len(topic.Partitions))
126
127 for i, p := range topic.Partitions {
128 partitions[i] = OffsetCommitPartition{
129 Partition: int(p.PartitionIndex),
130 Error: makeError(p.ErrorCode, ""),
131 }
132 }
133
134 res.Topics[topic.Name] = partitions

Callers 2

TestClientDeleteOffsetFunction · 0.80
TestClientOffsetCommitFunction · 0.80

Calls 3

roundTripMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85

Tested by 2

TestClientDeleteOffsetFunction · 0.64
TestClientOffsetCommitFunction · 0.64