MCPcopy
hub / github.com/IBM/sarama / TestConsumerNonSequentialOffsets

Function TestConsumerNonSequentialOffsets

consumer_test.go:1168–1220  ·  view source on GitHub ↗

It is fine if offsets of fetched messages are not sequential (although strictly increasing!).

(t *testing.T)

Source from the content-addressed store, hash-verified

1166// It is fine if offsets of fetched messages are not sequential (although
1167// strictly increasing!).
1168func TestConsumerNonSequentialOffsets(t *testing.T) {
1169 // Given
1170 legacyFetchResponse := &FetchResponse{}
1171 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
1172 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
1173 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
1174 newFetchResponse := &FetchResponse{Version: 5}
1175 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
1176 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
1177 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
1178 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
1179 newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
1180 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
1181 cfg := NewTestConfig()
1182 if fetchResponse1.Version >= 4 {
1183 cfg.Version = V0_11_0_0
1184 }
1185
1186 broker0 := NewMockBroker(t, 0)
1187 fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
1188 fetchResponse2.AddError("my_topic", 0, ErrNoError)
1189 broker0.SetHandlerByMap(map[string]MockResponse{
1190 "MetadataRequest": NewMockMetadataResponse(t).
1191 SetBroker(broker0.Addr(), broker0.BrokerID()).
1192 SetLeader("my_topic", 0, broker0.BrokerID()),
1193 "OffsetRequest": NewMockOffsetResponse(t).
1194 SetOffset("my_topic", 0, OffsetNewest, 1234).
1195 SetOffset("my_topic", 0, OffsetOldest, 0),
1196 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
1197 })
1198
1199 master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1200 if err != nil {
1201 t.Fatal(err)
1202 }
1203
1204 // When
1205 consumer, err := master.ConsumePartition("my_topic", 0, 3)
1206 if err != nil {
1207 t.Fatal(err)
1208 }
1209
1210 // Then: messages with offsets 1 and 2 are not returned even though they
1211 // are present in the response.
1212 assertMessageOffset(t, <-consumer.Messages(), 5)
1213 assertMessageOffset(t, <-consumer.Messages(), 7)
1214 assertMessageOffset(t, <-consumer.Messages(), 11)
1215
1216 safeClose(t, consumer)
1217 safeClose(t, master)
1218 broker0.Close()
1219 }
1220}
1221
1222// If leadership for a partition is changing then consumer resolves the new
1223// leader and switches to it.

Callers

nothing calls this directly

Calls 15

AddMessageMethod · 0.95
AddRecordMethod · 0.95
SetLastOffsetDeltaMethod · 0.95
SetLastStableOffsetMethod · 0.95
AddErrorMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85

Tested by

no test coverage detected