(t *testing.T)
| 795 | } |
| 796 | |
| 797 | func TestConsumeMessageWithSessionIDs(t *testing.T) { |
| 798 | // Given |
| 799 | fetchResponse1 := &FetchResponse{Version: 7} |
| 800 | fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) |
| 801 | fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) |
| 802 | |
| 803 | cfg := NewTestConfig() |
| 804 | cfg.Version = V1_1_0_0 |
| 805 | |
| 806 | broker0 := NewMockBroker(t, 0) |
| 807 | fetchResponse2 := &FetchResponse{} |
| 808 | fetchResponse2.Version = 7 |
| 809 | fetchResponse2.AddError("my_topic", 0, ErrNoError) |
| 810 | |
| 811 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 812 | "MetadataRequest": NewMockMetadataResponse(t). |
| 813 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 814 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 815 | "OffsetRequest": NewMockOffsetResponse(t). |
| 816 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 817 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 818 | "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), |
| 819 | }) |
| 820 | |
| 821 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 822 | if err != nil { |
| 823 | t.Fatal(err) |
| 824 | } |
| 825 | |
| 826 | // When |
| 827 | consumer, err := master.ConsumePartition("my_topic", 0, 1) |
| 828 | if err != nil { |
| 829 | t.Fatal(err) |
| 830 | } |
| 831 | |
| 832 | assertMessageOffset(t, <-consumer.Messages(), 1) |
| 833 | assertMessageOffset(t, <-consumer.Messages(), 2) |
| 834 | |
| 835 | safeClose(t, consumer) |
| 836 | safeClose(t, master) |
| 837 | broker0.Close() |
| 838 | |
| 839 | fetchReq := broker0.History()[3].Request.(*FetchRequest) |
| 840 | if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 { |
| 841 | t.Error("Expected session ID to be zero & Epoch to be -1") |
| 842 | } |
| 843 | } |
| 844 | |
| 845 | func TestConsumeMessagesFromReadReplica(t *testing.T) { |
| 846 | withRefreshFrequency := func(frequency time.Duration) func(*Config) { |
nothing calls this directly
no test coverage detected