If a fetch response contains messages with offsets that are smaller then requested, then such messages are ignored.
(t *testing.T)
| 635 | // If a fetch response contains messages with offsets that are smaller then |
| 636 | // requested, then such messages are ignored. |
| 637 | func TestConsumerExtraOffsets(t *testing.T) { |
| 638 | // Given |
| 639 | legacyFetchResponse := &FetchResponse{} |
| 640 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) |
| 641 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) |
| 642 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) |
| 643 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) |
| 644 | newFetchResponse := &FetchResponse{Version: 5} |
| 645 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1) |
| 646 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) |
| 647 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) |
| 648 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4) |
| 649 | newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4) |
| 650 | newFetchResponse.SetLastStableOffset("my_topic", 0, 4) |
| 651 | for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { |
| 652 | cfg := NewTestConfig() |
| 653 | cfg.Consumer.Return.Errors = true |
| 654 | if fetchResponse1.Version >= 5 { |
| 655 | cfg.Version = V0_11_0_0 |
| 656 | } |
| 657 | |
| 658 | broker0 := NewMockBroker(t, 0) |
| 659 | fetchResponse2 := &FetchResponse{} |
| 660 | fetchResponse2.Version = fetchResponse1.Version |
| 661 | fetchResponse2.AddError("my_topic", 0, ErrNoError) |
| 662 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 663 | "MetadataRequest": NewMockMetadataResponse(t). |
| 664 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 665 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 666 | "OffsetRequest": NewMockOffsetResponse(t). |
| 667 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 668 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 669 | "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), |
| 670 | }) |
| 671 | |
| 672 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 673 | if err != nil { |
| 674 | t.Fatal(err) |
| 675 | } |
| 676 | |
| 677 | // When |
| 678 | consumer, err := master.ConsumePartition("my_topic", 0, 3) |
| 679 | if err != nil { |
| 680 | t.Fatal(err) |
| 681 | } |
| 682 | |
| 683 | // Then: messages with offsets 1 and 2 are not returned even though they |
| 684 | // are present in the response. |
| 685 | select { |
| 686 | case msg := <-consumer.Messages(): |
| 687 | assertMessageOffset(t, msg, 3) |
| 688 | case err := <-consumer.Errors(): |
| 689 | t.Fatal(err) |
| 690 | } |
| 691 | |
| 692 | select { |
| 693 | case msg := <-consumer.Messages(): |
| 694 | assertMessageOffset(t, msg, 4) |
nothing calls this directly
no test coverage detected