MCPcopy
hub / github.com/IBM/sarama / TestConsumeMessageWithSessionIDs

Function TestConsumeMessageWithSessionIDs

consumer_test.go:797–843  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

795}
796
797func TestConsumeMessageWithSessionIDs(t *testing.T) {
798 // Given
799 fetchResponse1 := &FetchResponse{Version: 7}
800 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
801 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
802
803 cfg := NewTestConfig()
804 cfg.Version = V1_1_0_0
805
806 broker0 := NewMockBroker(t, 0)
807 fetchResponse2 := &FetchResponse{}
808 fetchResponse2.Version = 7
809 fetchResponse2.AddError("my_topic", 0, ErrNoError)
810
811 broker0.SetHandlerByMap(map[string]MockResponse{
812 "MetadataRequest": NewMockMetadataResponse(t).
813 SetBroker(broker0.Addr(), broker0.BrokerID()).
814 SetLeader("my_topic", 0, broker0.BrokerID()),
815 "OffsetRequest": NewMockOffsetResponse(t).
816 SetOffset("my_topic", 0, OffsetNewest, 1234).
817 SetOffset("my_topic", 0, OffsetOldest, 0),
818 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
819 })
820
821 master, err := NewConsumer([]string{broker0.Addr()}, cfg)
822 if err != nil {
823 t.Fatal(err)
824 }
825
826 // When
827 consumer, err := master.ConsumePartition("my_topic", 0, 1)
828 if err != nil {
829 t.Fatal(err)
830 }
831
832 assertMessageOffset(t, <-consumer.Messages(), 1)
833 assertMessageOffset(t, <-consumer.Messages(), 2)
834
835 safeClose(t, consumer)
836 safeClose(t, master)
837 broker0.Close()
838
839 fetchReq := broker0.History()[3].Request.(*FetchRequest)
840 if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
841 t.Error("Expected session ID to be zero & Epoch to be -1")
842 }
843}
844
845func TestConsumeMessagesFromReadReplica(t *testing.T) {
846 withRefreshFrequency := func(frequency time.Duration) func(*Config) {

Callers

nothing calls this directly

Calls 15

AddMessageMethod · 0.95
AddErrorMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
HistoryMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockSequenceFunction · 0.85

Tested by

no test coverage detected