MCPcopy
hub / github.com/segmentio/kafka-go / OffsetFetch

Method OffsetFetch

offsetfetch.go:68–125  ·  view source on GitHub ↗

OffsetFetch sends an offset fetch request to a kafka broker and returns the response.

(ctx context.Context, req *OffsetFetchRequest)

Source from the content-addressed store, hash-verified

66// OffsetFetch sends an offset fetch request to a kafka broker and returns the
67// response.
68func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
69
70 // Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API
71 // which will return the result for all topics with the desired consumer group:
72 // https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch
73 // For Kafka version below 0.10.2.x this call will result in an error
74 var topics []offsetfetch.RequestTopic
75
76 if len(req.Topics) > 0 {
77 topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics))
78
79 for topicName, partitions := range req.Topics {
80 indexes := make([]int32, len(partitions))
81
82 for i, p := range partitions {
83 indexes[i] = int32(p)
84 }
85
86 topics = append(topics, offsetfetch.RequestTopic{
87 Name: topicName,
88 PartitionIndexes: indexes,
89 })
90 }
91 }
92
93 m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
94 GroupID: req.GroupID,
95 Topics: topics,
96 })
97
98 if err != nil {
99 return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err)
100 }
101
102 res := m.(*offsetfetch.Response)
103 ret := &OffsetFetchResponse{
104 Throttle: makeDuration(res.ThrottleTimeMs),
105 Topics: make(map[string][]OffsetFetchPartition, len(res.Topics)),
106 Error: makeError(res.ErrorCode, ""),
107 }
108
109 for _, t := range res.Topics {
110 partitions := make([]OffsetFetchPartition, len(t.Partitions))
111
112 for i, p := range t.Partitions {
113 partitions[i] = OffsetFetchPartition{
114 Partition: int(p.PartitionIndex),
115 CommittedOffset: p.CommittedOffset,
116 Metadata: p.Metadata,
117 Error: makeError(p.ErrorCode, ""),
118 }
119 }
120
121 ret.Topics[t.Name] = partitions
122 }
123
124 return ret, nil
125}

Callers 6

ConsumerOffsetsMethod · 0.95
TestClientDeleteOffsetFunction · 0.80
TestClientOffsetCommitFunction · 0.80

Calls 3

roundTripMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85

Tested by 5

TestClientDeleteOffsetFunction · 0.64
TestClientOffsetCommitFunction · 0.64