MCPcopy
hub / github.com/IBM/sarama / TestConsumerOffsetOldest

Function TestConsumerOffsetOldest

consumer_test.go:287–331  ·  view source on GitHub ↗

If `OffsetOldest` is passed as the initial offset then the first consumed message is indeed the first available in the partition.

(t *testing.T)

Source from the content-addressed store, hash-verified

285// If `OffsetOldest` is passed as the initial offset then the first consumed
286// message is indeed the first available in the partition.
287func TestConsumerOffsetOldest(t *testing.T) {
288 // Given
289 offsetNewest := int64(10)
290 broker0 := NewMockBroker(t, 0)
291 broker0.SetHandlerByMap(map[string]MockResponse{
292 "MetadataRequest": NewMockMetadataResponse(t).
293 SetBroker(broker0.Addr(), broker0.BrokerID()).
294 SetLeader("my_topic", 0, broker0.BrokerID()),
295 "OffsetRequest": NewMockOffsetResponse(t).
296 SetOffset("my_topic", 0, OffsetNewest, offsetNewest).
297 SetOffset("my_topic", 0, OffsetOldest, 7),
298 "FetchRequest": NewMockFetchResponse(t, 1).
299 // skipped because parseRecords(): offset < child.offset
300 SetMessage("my_topic", 0, 6, testMsg).
301 // these will get to the Messages() channel
302 SetMessage("my_topic", 0, 7, testMsg).
303 SetMessage("my_topic", 0, 8, testMsg).
304 SetMessage("my_topic", 0, 9, testMsg).
305 SetHighWaterMark("my_topic", 0, offsetNewest),
306 })
307
308 master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
309 if err != nil {
310 t.Fatal(err)
311 }
312
313 // When
314 consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
315 if err != nil {
316 t.Fatal(err)
317 }
318
319 // Then
320 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
321 t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
322 }
323 assertMessageOffset(t, <-consumer.Messages(), int64(7))
324 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
325 t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
326 }
327
328 safeClose(t, consumer)
329 safeClose(t, master)
330 broker0.Close()
331}
332
333// It is possible to close a partition consumer and create the same anew.
334func TestConsumerRecreate(t *testing.T) {

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80
SetBrokerMethod · 0.80

Tested by

no test coverage detected