(t *testing.T)
| 2186 | } |
| 2187 | |
| 2188 | func TestListOffsets(t *testing.T) { |
| 2189 | const topic = "my-topic" |
| 2190 | |
| 2191 | t.Run("returns offsets from a single broker", func(t *testing.T) { |
| 2192 | const partition = int32(0) |
| 2193 | const timestamp = int64(1690000000000) |
| 2194 | const expectedOffset = int64(42) |
| 2195 | |
| 2196 | broker := newMockBroker(t, 1) |
| 2197 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 2198 | "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, partition, timestamp, expectedOffset), |
| 2199 | "MetadataRequest": mockMetadataFor(t, broker).SetLeader(topic, partition, broker.BrokerID()), |
| 2200 | }) |
| 2201 | |
| 2202 | result, err := newTestAdmin(t, broker).ListOffsets(map[string]map[int32]int64{ |
| 2203 | topic: {partition: timestamp}, |
| 2204 | }, nil) |
| 2205 | require.NoError(t, err) |
| 2206 | |
| 2207 | info := result[topic][partition] |
| 2208 | require.NotNil(t, info) |
| 2209 | assert.Equal(t, ErrNoError, info.Err) |
| 2210 | assert.Equal(t, expectedOffset, info.Offset) |
| 2211 | }) |
| 2212 | |
| 2213 | t.Run("fans out across partition leaders", func(t *testing.T) { |
| 2214 | const offset1 = int64(100) |
| 2215 | const offset2 = int64(200) |
| 2216 | |
| 2217 | leader1 := newMockBroker(t, 1) |
| 2218 | leader2 := newMockBroker(t, 2) |
| 2219 | metadata := mockMetadataFor(t, leader1, leader2). |
| 2220 | SetLeader(topic, 0, leader1.BrokerID()). |
| 2221 | SetLeader(topic, 1, leader2.BrokerID()) |
| 2222 | |
| 2223 | leader1.SetHandlerByMap(map[string]MockResponse{ |
| 2224 | "MetadataRequest": metadata, |
| 2225 | "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, 0, OffsetNewest, offset1), |
| 2226 | }) |
| 2227 | leader2.SetHandlerByMap(map[string]MockResponse{ |
| 2228 | "MetadataRequest": metadata, |
| 2229 | "OffsetRequest": NewMockOffsetResponse(t).SetOffset(topic, 1, OffsetNewest, offset2), |
| 2230 | }) |
| 2231 | |
| 2232 | result, err := newTestAdmin(t, leader1).ListOffsets(map[string]map[int32]int64{ |
| 2233 | topic: {0: OffsetNewest, 1: OffsetNewest}, |
| 2234 | }, nil) |
| 2235 | require.NoError(t, err) |
| 2236 | require.Contains(t, result, topic) |
| 2237 | assert.Equal(t, offset1, result[topic][0].Offset) |
| 2238 | assert.Equal(t, offset2, result[topic][1].Offset) |
| 2239 | }) |
| 2240 | |
| 2241 | t.Run("propagates IsolationLevel to the broker request", func(t *testing.T) { |
| 2242 | const partition = int32(0) |
| 2243 | |
| 2244 | broker := newMockBroker(t, 1) |
| 2245 | metadata := mockMetadataFor(t, broker).SetLeader(topic, partition, broker.BrokerID()) |
nothing calls this directly
no test coverage detected