MCPcopy
hub / github.com/IBM/sarama / TestConsumerOffsetNewest

Function TestConsumerOffsetNewest

consumer_test.go:241–283  ·  view source on GitHub ↗

If `OffsetNewest` is passed as the initial offset then the first consumed message indeed corresponds to the offset that broker claims to be the newest in its metadata response.

(t *testing.T)

Source from the content-addressed store, hash-verified

239// message indeed corresponds to the offset that broker claims to be the
240// newest in its metadata response.
241func TestConsumerOffsetNewest(t *testing.T) {
242 // Given
243 offsetNewest := int64(10)
244 offsetNewestAfterFetchRequest := int64(50)
245 broker0 := NewMockBroker(t, 0)
246 broker0.SetHandlerByMap(map[string]MockResponse{
247 "MetadataRequest": NewMockMetadataResponse(t).
248 SetBroker(broker0.Addr(), broker0.BrokerID()).
249 SetLeader("my_topic", 0, broker0.BrokerID()),
250 "OffsetRequest": NewMockOffsetResponse(t).
251 SetOffset("my_topic", 0, OffsetNewest, offsetNewest).
252 SetOffset("my_topic", 0, OffsetOldest, 7),
253 "FetchRequest": NewMockFetchResponse(t, 1).
254 SetMessage("my_topic", 0, 9, testMsg). // skipped because parseRecords(): offset < child.offset
255 SetMessage("my_topic", 0, 10, testMsg).
256 SetMessage("my_topic", 0, 11, testMsg).
257 SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest),
258 })
259
260 master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
261 if err != nil {
262 t.Fatal(err)
263 }
264
265 // When
266 consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
267 if err != nil {
268 t.Fatal(err)
269 }
270
271 // Then
272 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
273 t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
274 }
275 assertMessageOffset(t, <-consumer.Messages(), 10)
276 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
277 t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
278 }
279
280 safeClose(t, consumer)
281 safeClose(t, master)
282 broker0.Close()
283}
284
285// If `OffsetOldest` is passed as the initial offset then the first consumed
286// message is indeed the first available in the partition.

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80
SetBrokerMethod · 0.80

Tested by

no test coverage detected