MCPcopy
hub / github.com/IBM/sarama / TestConsumerMessageWithKey

Function TestConsumerMessageWithKey

consumer_test.go:93–155  ·  consumer_test.go::TestConsumerMessageWithKey

If a message is given a key, it can be correctly collected while consuming.

(t *testing.T)

Source from the content-addressed store, hash-verified

91
92// If a message is given a key, it can be correctly collected while consuming.
93func TestConsumerMessageWithKey(t *testing.T) {
94 // Given
95 broker0 := NewMockBroker(t, 0)
96
97 manualOffset := int64(1234)
98 offsetNewest := int64(2345)
99 offsetNewestAfterFetchRequest := int64(3456)
100
101 mockFetchResponse := NewMockFetchResponse(t, 1)
102
103 // skipped because parseRecords(): offset < child.offset
104 mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg)
105
106 for i := range int64(10) {
107 mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg)
108 }
109
110 mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)
111
112 broker0.SetHandlerByMap(map[string]MockResponse{
113 "MetadataRequest": NewMockMetadataResponse(t).
114 SetBroker(broker0.Addr(), broker0.BrokerID()).
115 SetLeader("my_topic", 0, broker0.BrokerID()),
116 "OffsetRequest": NewMockOffsetResponse(t).
117 SetOffset("my_topic", 0, OffsetOldest, 0).
118 SetOffset("my_topic", 0, OffsetNewest, offsetNewest),
119 "FetchRequest": mockFetchResponse,
120 })
121
122 // When
123 master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
124 if err != nil {
125 t.Fatal(err)
126 }
127
128 consumer, err := master.ConsumePartition("my_topic", 0, manualOffset)
129 if err != nil {
130 t.Fatal(err)
131 }
132
133 // Then
134 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
135 t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
136 }
137 for i := range int64(10) {
138 select {
139 case message := <-consumer.Messages():
140 assertMessageOffset(t, message, i+manualOffset)
141 assertMessageKey(t, message, testKey)
142 assertMessageValue(t, message, testMsg)
143 case err := <-consumer.Errors():
144 t.Error(err)
145 }
146 }
147
148 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
149 t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
150 }

Callers

nothing calls this directly

Calls 15

SetMessageWithKeyMethod · 0.95
SetHighWaterMarkMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockFetchResponseFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
assertMessageOffsetFunction · 0.85

Tested by

no test coverage detected