If `OffsetOldest` is passed as the initial offset then the first consumed message is indeed the first available in the partition.
(t *testing.T)
| 285 | // If `OffsetOldest` is passed as the initial offset then the first consumed |
| 286 | // message is indeed the first available in the partition. |
| 287 | func TestConsumerOffsetOldest(t *testing.T) { |
| 288 | // Given |
| 289 | offsetNewest := int64(10) |
| 290 | broker0 := NewMockBroker(t, 0) |
| 291 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 292 | "MetadataRequest": NewMockMetadataResponse(t). |
| 293 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 294 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 295 | "OffsetRequest": NewMockOffsetResponse(t). |
| 296 | SetOffset("my_topic", 0, OffsetNewest, offsetNewest). |
| 297 | SetOffset("my_topic", 0, OffsetOldest, 7), |
| 298 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 299 | // skipped because parseRecords(): offset < child.offset |
| 300 | SetMessage("my_topic", 0, 6, testMsg). |
| 301 | // these will get to the Messages() channel |
| 302 | SetMessage("my_topic", 0, 7, testMsg). |
| 303 | SetMessage("my_topic", 0, 8, testMsg). |
| 304 | SetMessage("my_topic", 0, 9, testMsg). |
| 305 | SetHighWaterMark("my_topic", 0, offsetNewest), |
| 306 | }) |
| 307 | |
| 308 | master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 309 | if err != nil { |
| 310 | t.Fatal(err) |
| 311 | } |
| 312 | |
| 313 | // When |
| 314 | consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest) |
| 315 | if err != nil { |
| 316 | t.Fatal(err) |
| 317 | } |
| 318 | |
| 319 | // Then |
| 320 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { |
| 321 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) |
| 322 | } |
| 323 | assertMessageOffset(t, <-consumer.Messages(), int64(7)) |
| 324 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { |
| 325 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) |
| 326 | } |
| 327 | |
| 328 | safeClose(t, consumer) |
| 329 | safeClose(t, master) |
| 330 | broker0.Close() |
| 331 | } |
| 332 | |
| 333 | // It is possible to close a partition consumer and create the same anew. |
| 334 | func TestConsumerRecreate(t *testing.T) { |
nothing calls this directly
no test coverage detected