MCPcopy
hub / github.com/IBM/sarama / TestConsumerReceivingFetchResponseWithTooOldRecords

Function TestConsumerReceivingFetchResponseWithTooOldRecords

consumer_test.go:708–753  ·  view source on GitHub ↗

In some situations broker may return a block containing only messages older then requested, even though there would be more messages if higher offset was requested.

(t *testing.T)

Source from the content-addressed store, hash-verified

706// messages older then requested, even though there would be
707// more messages if higher offset was requested.
708func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
709 // Given
710 fetchResponse1 := &FetchResponse{Version: 5}
711 fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
712
713 fetchResponse2 := &FetchResponse{Version: 5}
714 fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
715
716 cfg := NewTestConfig()
717 cfg.Consumer.Return.Errors = true
718 cfg.Version = V0_11_0_0
719
720 broker0 := NewMockBroker(t, 0)
721
722 broker0.SetHandlerByMap(map[string]MockResponse{
723 "MetadataRequest": NewMockMetadataResponse(t).
724 SetBroker(broker0.Addr(), broker0.BrokerID()).
725 SetLeader("my_topic", 0, broker0.BrokerID()),
726 "OffsetRequest": NewMockOffsetResponse(t).
727 SetOffset("my_topic", 0, OffsetNewest, 1234).
728 SetOffset("my_topic", 0, OffsetOldest, 0),
729 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
730 })
731
732 master, err := NewConsumer([]string{broker0.Addr()}, cfg)
733 if err != nil {
734 t.Fatal(err)
735 }
736
737 // When
738 consumer, err := master.ConsumePartition("my_topic", 0, 2)
739 if err != nil {
740 t.Fatal(err)
741 }
742
743 select {
744 case msg := <-consumer.Messages():
745 assertMessageOffset(t, msg, 1000000)
746 case err := <-consumer.Errors():
747 t.Fatal(err)
748 }
749
750 safeClose(t, consumer)
751 safeClose(t, master)
752 broker0.Close()
753}
754
755func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
756 // Given

Callers

nothing calls this directly

Calls 15

AddRecordMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockSequenceFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80

Tested by

no test coverage detected