MCPcopy
hub / github.com/IBM/sarama / TestPauseResumeConsumption

Function TestPauseResumeConsumption

consumer_test.go:157–236  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

155}
156
157func TestPauseResumeConsumption(t *testing.T) {
158 // Given
159 broker0 := NewMockBroker(t, 0)
160
161 const newestOffsetBroker = 1233
162 const maxOffsetBroker = newestOffsetBroker + 10
163 offsetBroker := newestOffsetBroker
164 offsetClient := offsetBroker
165
166 mockFetchResponse := NewMockFetchResponse(t, 1)
167 mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg)
168 offsetBroker++
169
170 brokerResponses := map[string]MockResponse{
171 "MetadataRequest": NewMockMetadataResponse(t).
172 SetBroker(broker0.Addr(), broker0.BrokerID()).
173 SetLeader("my_topic", 0, broker0.BrokerID()),
174 "OffsetRequest": NewMockOffsetResponse(t).
175 SetOffset("my_topic", 0, OffsetOldest, 0).
176 SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)),
177 "FetchRequest": mockFetchResponse,
178 }
179
180 broker0.SetHandlerByMap(brokerResponses)
181
182 // When
183 master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
184 if err != nil {
185 t.Fatal(err)
186 }
187
188 consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
189 if err != nil {
190 t.Fatal(err)
191 }
192
193 // pause the consumption
194 consumer.Pause()
195
196 // set more msgs on broker
197 for ; offsetBroker < maxOffsetBroker; offsetBroker++ {
198 mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg)
199 }
200 brokerResponses["FetchRequest"] = mockFetchResponse
201 broker0.SetHandlerByMap(brokerResponses)
202
203 keepConsuming := true
204 for keepConsuming {
205 select {
206 case message := <-consumer.Messages():
207 // only the first msg is expected to be consumed
208 offsetClient++
209 assertMessageOffset(t, message, int64(newestOffsetBroker))
210 case err := <-consumer.Errors():
211 t.Fatal(err)
212 case <-time.After(time.Second):
213 // is expected to timedout once the consumption is pauses
214 keepConsuming = false

Callers

nothing calls this directly

Calls 15

SetMessageMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
SetHandlerByMapMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockFetchResponseFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80

Tested by

no test coverage detected