(t *testing.T)
| 753 | } |
| 754 | |
| 755 | func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { |
| 756 | // Given |
| 757 | fetchResponse1 := &FetchResponse{Version: 5} |
| 758 | fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) |
| 759 | fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) |
| 760 | |
| 761 | cfg := NewTestConfig() |
| 762 | cfg.Version = V0_11_0_0 |
| 763 | |
| 764 | broker0 := NewMockBroker(t, 0) |
| 765 | fetchResponse2 := &FetchResponse{} |
| 766 | fetchResponse2.Version = 4 |
| 767 | fetchResponse2.AddError("my_topic", 0, ErrNoError) |
| 768 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 769 | "MetadataRequest": NewMockMetadataResponse(t). |
| 770 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 771 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 772 | "OffsetRequest": NewMockOffsetResponse(t). |
| 773 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 774 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 775 | "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), |
| 776 | }) |
| 777 | |
| 778 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 779 | if err != nil { |
| 780 | t.Fatal(err) |
| 781 | } |
| 782 | |
| 783 | // When |
| 784 | consumer, err := master.ConsumePartition("my_topic", 0, 1) |
| 785 | if err != nil { |
| 786 | t.Fatal(err) |
| 787 | } |
| 788 | |
| 789 | assertMessageOffset(t, <-consumer.Messages(), 1) |
| 790 | assertMessageOffset(t, <-consumer.Messages(), 2) |
| 791 | |
| 792 | safeClose(t, consumer) |
| 793 | safeClose(t, master) |
| 794 | broker0.Close() |
| 795 | } |
| 796 | |
| 797 | func TestConsumeMessageWithSessionIDs(t *testing.T) { |
| 798 | // Given |
nothing calls this directly
no test coverage detected