ListOffsets sends an offset request to a kafka broker and returns the response.
(ctx context.Context, req *ListOffsetsRequest)
| 77 | // ListOffsets sends an offset request to a kafka broker and returns the |
| 78 | // response. |
| 79 | func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) { |
| 80 | type topicPartition struct { |
| 81 | topic string |
| 82 | partition int |
| 83 | } |
| 84 | |
| 85 | partitionOffsets := make(map[topicPartition]PartitionOffsets) |
| 86 | |
| 87 | for topicName, requests := range req.Topics { |
| 88 | for _, r := range requests { |
| 89 | key := topicPartition{ |
| 90 | topic: topicName, |
| 91 | partition: r.Partition, |
| 92 | } |
| 93 | |
| 94 | partition, ok := partitionOffsets[key] |
| 95 | if !ok { |
| 96 | partition = PartitionOffsets{ |
| 97 | Partition: r.Partition, |
| 98 | FirstOffset: -1, |
| 99 | LastOffset: -1, |
| 100 | Offsets: make(map[int64]time.Time), |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | switch r.Timestamp { |
| 105 | case FirstOffset: |
| 106 | partition.FirstOffset = 0 |
| 107 | case LastOffset: |
| 108 | partition.LastOffset = 0 |
| 109 | } |
| 110 | |
| 111 | partitionOffsets[topicPartition{ |
| 112 | topic: topicName, |
| 113 | partition: r.Partition, |
| 114 | }] = partition |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | topics := make([]listoffsets.RequestTopic, 0, len(req.Topics)) |
| 119 | |
| 120 | for topicName, requests := range req.Topics { |
| 121 | partitions := make([]listoffsets.RequestPartition, len(requests)) |
| 122 | |
| 123 | for i, r := range requests { |
| 124 | partitions[i] = listoffsets.RequestPartition{ |
| 125 | Partition: int32(r.Partition), |
| 126 | CurrentLeaderEpoch: -1, |
| 127 | Timestamp: r.Timestamp, |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | topics = append(topics, listoffsets.RequestTopic{ |
| 132 | Topic: topicName, |
| 133 | Partitions: partitions, |
| 134 | }) |
| 135 | } |
| 136 |