MCPcopy
hub / github.com/IBM/sarama / TestConsumerOffsetManual

Function TestConsumerOffsetManual

consumer_test.go:30–90  ·  consumer_test.go::TestConsumerOffsetManual

If a particular offset is provided then messages are consumed starting from that offset.

(t *testing.T)

Source from the content-addressed store, hash-verified

28// If a particular offset is provided then messages are consumed starting from
29// that offset.
30func TestConsumerOffsetManual(t *testing.T) {
31 // Given
32 broker0 := NewMockBroker(t, 0)
33
34 manualOffset := int64(1234)
35 offsetNewest := int64(2345)
36 offsetNewestAfterFetchRequest := int64(3456)
37
38 mockFetchResponse := NewMockFetchResponse(t, 1)
39
40 // skipped because parseRecords(): offset < child.offset
41 mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg)
42
43 for i := range int64(10) {
44 mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg)
45 }
46
47 mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)
48
49 broker0.SetHandlerByMap(map[string]MockResponse{
50 "MetadataRequest": NewMockMetadataResponse(t).
51 SetBroker(broker0.Addr(), broker0.BrokerID()).
52 SetLeader("my_topic", 0, broker0.BrokerID()),
53 "OffsetRequest": NewMockOffsetResponse(t).
54 SetOffset("my_topic", 0, OffsetOldest, 0).
55 SetOffset("my_topic", 0, OffsetNewest, offsetNewest),
56 "FetchRequest": mockFetchResponse,
57 })
58
59 // When
60 master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
61 if err != nil {
62 t.Fatal(err)
63 }
64
65 consumer, err := master.ConsumePartition("my_topic", 0, manualOffset)
66 if err != nil {
67 t.Fatal(err)
68 }
69
70 // Then
71 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
72 t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
73 }
74 for i := range int64(10) {
75 select {
76 case message := <-consumer.Messages():
77 assertMessageOffset(t, message, i+manualOffset)
78 case err := <-consumer.Errors():
79 t.Error(err)
80 }
81 }
82
83 if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
84 t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
85 }
86
87 safeClose(t, consumer)

Callers

nothing calls this directly

Calls 15

SetMessageMethod · 0.95
SetHighWaterMarkMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockFetchResponseFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
assertMessageOffsetFunction · 0.85

Tested by

no test coverage detected