Nothing bad happens if a partition consumer that has no leader assigned at the moment is closed.
(t *testing.T)
| 543 | // Nothing bad happens if a partition consumer that has no leader assigned at |
| 544 | // the moment is closed. |
| 545 | func TestConsumerClosePartitionWithoutLeader(t *testing.T) { |
| 546 | // Given |
| 547 | broker0 := NewMockBroker(t, 100) |
| 548 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 549 | "MetadataRequest": NewMockMetadataResponse(t). |
| 550 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 551 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 552 | "OffsetRequest": NewMockOffsetResponse(t). |
| 553 | SetOffset("my_topic", 0, OffsetOldest, 123). |
| 554 | SetOffset("my_topic", 0, OffsetNewest, 1000), |
| 555 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 556 | SetMessage("my_topic", 0, 123, testMsg), |
| 557 | }) |
| 558 | |
| 559 | config := NewTestConfig() |
| 560 | config.Net.ReadTimeout = 100 * time.Millisecond |
| 561 | config.Consumer.Retry.Backoff = 100 * time.Millisecond |
| 562 | config.Consumer.Return.Errors = true |
| 563 | config.Metadata.Retry.Max = 0 |
| 564 | c, err := NewConsumer([]string{broker0.Addr()}, config) |
| 565 | if err != nil { |
| 566 | t.Fatal(err) |
| 567 | } |
| 568 | |
| 569 | pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) |
| 570 | if err != nil { |
| 571 | t.Fatal(err) |
| 572 | } |
| 573 | |
| 574 | assertMessageOffset(t, <-pc.Messages(), 123) |
| 575 | |
| 576 | // broker0 says that it is no longer the leader for my_topic/0, but the |
| 577 | // requests to retrieve metadata fail with network timeout. |
| 578 | fetchResponse2 := &FetchResponse{} |
| 579 | fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) |
| 580 | |
| 581 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 582 | "FetchRequest": NewMockWrapper(fetchResponse2), |
| 583 | }) |
| 584 | |
| 585 | // When |
| 586 | if consErr := <-pc.Errors(); !errors.Is(consErr.Err, ErrOutOfBrokers) { |
| 587 | t.Errorf("Unexpected error: %v", consErr.Err) |
| 588 | } |
| 589 | |
| 590 | // Then: the partition consumer can be closed without any problem. |
| 591 | safeClose(t, pc) |
| 592 | safeClose(t, c) |
| 593 | broker0.Close() |
| 594 | } |
| 595 | |
| 596 | // If the initial offset passed on partition consumer creation is out of the |
| 597 | // actual offset range for the partition, then the partition consumer stops |
nothing calls this directly
no test coverage detected