In some situations broker may return a block containing only messages older then requested, even though there would be more messages if higher offset was requested.
(t *testing.T)
| 706 | // messages older then requested, even though there would be |
| 707 | // more messages if higher offset was requested. |
| 708 | func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { |
| 709 | // Given |
| 710 | fetchResponse1 := &FetchResponse{Version: 5} |
| 711 | fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) |
| 712 | |
| 713 | fetchResponse2 := &FetchResponse{Version: 5} |
| 714 | fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) |
| 715 | |
| 716 | cfg := NewTestConfig() |
| 717 | cfg.Consumer.Return.Errors = true |
| 718 | cfg.Version = V0_11_0_0 |
| 719 | |
| 720 | broker0 := NewMockBroker(t, 0) |
| 721 | |
| 722 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 723 | "MetadataRequest": NewMockMetadataResponse(t). |
| 724 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 725 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 726 | "OffsetRequest": NewMockOffsetResponse(t). |
| 727 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 728 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 729 | "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), |
| 730 | }) |
| 731 | |
| 732 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 733 | if err != nil { |
| 734 | t.Fatal(err) |
| 735 | } |
| 736 | |
| 737 | // When |
| 738 | consumer, err := master.ConsumePartition("my_topic", 0, 2) |
| 739 | if err != nil { |
| 740 | t.Fatal(err) |
| 741 | } |
| 742 | |
| 743 | select { |
| 744 | case msg := <-consumer.Messages(): |
| 745 | assertMessageOffset(t, msg, 1000000) |
| 746 | case err := <-consumer.Errors(): |
| 747 | t.Fatal(err) |
| 748 | } |
| 749 | |
| 750 | safeClose(t, consumer) |
| 751 | safeClose(t, master) |
| 752 | broker0.Close() |
| 753 | } |
| 754 | |
| 755 | func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { |
| 756 | // Given |
nothing calls this directly
no test coverage detected