MCPcopy
hub / github.com/segmentio/kafka-go / ListOffsets

Method ListOffsets

listoffset.go:79–184  ·  view source on GitHub ↗

ListOffsets sends an offset request to a kafka broker and returns the response.

(ctx context.Context, req *ListOffsetsRequest)

Source from the content-addressed store, hash-verified

77// ListOffsets sends an offset request to a kafka broker and returns the
78// response.
79func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
80 type topicPartition struct {
81 topic string
82 partition int
83 }
84
85 partitionOffsets := make(map[topicPartition]PartitionOffsets)
86
87 for topicName, requests := range req.Topics {
88 for _, r := range requests {
89 key := topicPartition{
90 topic: topicName,
91 partition: r.Partition,
92 }
93
94 partition, ok := partitionOffsets[key]
95 if !ok {
96 partition = PartitionOffsets{
97 Partition: r.Partition,
98 FirstOffset: -1,
99 LastOffset: -1,
100 Offsets: make(map[int64]time.Time),
101 }
102 }
103
104 switch r.Timestamp {
105 case FirstOffset:
106 partition.FirstOffset = 0
107 case LastOffset:
108 partition.LastOffset = 0
109 }
110
111 partitionOffsets[topicPartition{
112 topic: topicName,
113 partition: r.Partition,
114 }] = partition
115 }
116 }
117
118 topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))
119
120 for topicName, requests := range req.Topics {
121 partitions := make([]listoffsets.RequestPartition, len(requests))
122
123 for i, r := range requests {
124 partitions[i] = listoffsets.RequestPartition{
125 Partition: int32(r.Partition),
126 CurrentLeaderEpoch: -1,
127 Timestamp: r.Timestamp,
128 }
129 }
130
131 topics = append(topics, listoffsets.RequestTopic{
132 Topic: topicName,
133 Partitions: partitions,
134 })
135 }
136

Callers 2

FetchMethod · 0.95
TestClientListOffsetsFunction · 0.80

Calls 4

roundTripMethod · 0.95
makeDurationFunction · 0.85
makeTimeFunction · 0.70
ErrorTypeAlias · 0.70

Tested by 1

TestClientListOffsetsFunction · 0.64