It is possible to close a partition consumer and create the same anew.
(t *testing.T)
| 332 | |
| 333 | // It is possible to close a partition consumer and create the same anew. |
| 334 | func TestConsumerRecreate(t *testing.T) { |
| 335 | // Given |
| 336 | broker0 := NewMockBroker(t, 0) |
| 337 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 338 | "MetadataRequest": NewMockMetadataResponse(t). |
| 339 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 340 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 341 | "OffsetRequest": NewMockOffsetResponse(t). |
| 342 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 343 | SetOffset("my_topic", 0, OffsetNewest, 1000), |
| 344 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 345 | SetMessage("my_topic", 0, 10, testMsg), |
| 346 | }) |
| 347 | |
| 348 | c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 349 | if err != nil { |
| 350 | t.Fatal(err) |
| 351 | } |
| 352 | |
| 353 | pc, err := c.ConsumePartition("my_topic", 0, 10) |
| 354 | if err != nil { |
| 355 | t.Fatal(err) |
| 356 | } |
| 357 | assertMessageOffset(t, <-pc.Messages(), 10) |
| 358 | |
| 359 | // When |
| 360 | safeClose(t, pc) |
| 361 | pc, err = c.ConsumePartition("my_topic", 0, 10) |
| 362 | if err != nil { |
| 363 | t.Fatal(err) |
| 364 | } |
| 365 | |
| 366 | // Then |
| 367 | assertMessageOffset(t, <-pc.Messages(), 10) |
| 368 | |
| 369 | safeClose(t, pc) |
| 370 | safeClose(t, c) |
| 371 | broker0.Close() |
| 372 | } |
| 373 | |
| 374 | // An attempt to consume the same partition twice should fail. |
| 375 | func TestConsumerDuplicate(t *testing.T) { |
nothing calls this directly
no test coverage detected