If `OffsetNewest` is passed as the initial offset then the first consumed message indeed corresponds to the offset that broker claims to be the newest in its metadata response.
(t *testing.T)
| 239 | // message indeed corresponds to the offset that broker claims to be the |
| 240 | // newest in its metadata response. |
| 241 | func TestConsumerOffsetNewest(t *testing.T) { |
| 242 | // Given |
| 243 | offsetNewest := int64(10) |
| 244 | offsetNewestAfterFetchRequest := int64(50) |
| 245 | broker0 := NewMockBroker(t, 0) |
| 246 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 247 | "MetadataRequest": NewMockMetadataResponse(t). |
| 248 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 249 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 250 | "OffsetRequest": NewMockOffsetResponse(t). |
| 251 | SetOffset("my_topic", 0, OffsetNewest, offsetNewest). |
| 252 | SetOffset("my_topic", 0, OffsetOldest, 7), |
| 253 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 254 | SetMessage("my_topic", 0, 9, testMsg). // skipped because parseRecords(): offset < child.offset |
| 255 | SetMessage("my_topic", 0, 10, testMsg). |
| 256 | SetMessage("my_topic", 0, 11, testMsg). |
| 257 | SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest), |
| 258 | }) |
| 259 | |
| 260 | master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 261 | if err != nil { |
| 262 | t.Fatal(err) |
| 263 | } |
| 264 | |
| 265 | // When |
| 266 | consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) |
| 267 | if err != nil { |
| 268 | t.Fatal(err) |
| 269 | } |
| 270 | |
| 271 | // Then |
| 272 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { |
| 273 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) |
| 274 | } |
| 275 | assertMessageOffset(t, <-consumer.Messages(), 10) |
| 276 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { |
| 277 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) |
| 278 | } |
| 279 | |
| 280 | safeClose(t, consumer) |
| 281 | safeClose(t, master) |
| 282 | broker0.Close() |
| 283 | } |
| 284 | |
| 285 | // If `OffsetOldest` is passed as the initial offset then the first consumed |
| 286 | // message is indeed the first available in the partition. |
nothing calls this directly
no test coverage detected