MCPcopy
hub / github.com/IBM/sarama / TestConsumerExtraOffsets

Function TestConsumerExtraOffsets

consumer_test.go:637–703  ·  view source on GitHub ↗

If a fetch response contains messages with offsets that are smaller then requested, then such messages are ignored.

(t *testing.T)

Source from the content-addressed store, hash-verified

635// If a fetch response contains messages with offsets that are smaller then
636// requested, then such messages are ignored.
637func TestConsumerExtraOffsets(t *testing.T) {
638 // Given
639 legacyFetchResponse := &FetchResponse{}
640 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
641 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
642 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
643 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
644 newFetchResponse := &FetchResponse{Version: 5}
645 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
646 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
647 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
648 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
649 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
650 newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
651 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
652 cfg := NewTestConfig()
653 cfg.Consumer.Return.Errors = true
654 if fetchResponse1.Version >= 5 {
655 cfg.Version = V0_11_0_0
656 }
657
658 broker0 := NewMockBroker(t, 0)
659 fetchResponse2 := &FetchResponse{}
660 fetchResponse2.Version = fetchResponse1.Version
661 fetchResponse2.AddError("my_topic", 0, ErrNoError)
662 broker0.SetHandlerByMap(map[string]MockResponse{
663 "MetadataRequest": NewMockMetadataResponse(t).
664 SetBroker(broker0.Addr(), broker0.BrokerID()).
665 SetLeader("my_topic", 0, broker0.BrokerID()),
666 "OffsetRequest": NewMockOffsetResponse(t).
667 SetOffset("my_topic", 0, OffsetNewest, 1234).
668 SetOffset("my_topic", 0, OffsetOldest, 0),
669 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
670 })
671
672 master, err := NewConsumer([]string{broker0.Addr()}, cfg)
673 if err != nil {
674 t.Fatal(err)
675 }
676
677 // When
678 consumer, err := master.ConsumePartition("my_topic", 0, 3)
679 if err != nil {
680 t.Fatal(err)
681 }
682
683 // Then: messages with offsets 1 and 2 are not returned even though they
684 // are present in the response.
685 select {
686 case msg := <-consumer.Messages():
687 assertMessageOffset(t, msg, 3)
688 case err := <-consumer.Errors():
689 t.Fatal(err)
690 }
691
692 select {
693 case msg := <-consumer.Messages():
694 assertMessageOffset(t, msg, 4)

Callers

nothing calls this directly

Calls 15

AddMessageMethod · 0.95
AddRecordMethod · 0.95
SetLastOffsetDeltaMethod · 0.95
SetLastStableOffsetMethod · 0.95
AddErrorMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85

Tested by

no test coverage detected