It is fine if offsets of fetched messages are not sequential (although strictly increasing!).
(t *testing.T)
| 1166 | // It is fine if offsets of fetched messages are not sequential (although |
| 1167 | // strictly increasing!). |
| 1168 | func TestConsumerNonSequentialOffsets(t *testing.T) { |
| 1169 | // Given |
| 1170 | legacyFetchResponse := &FetchResponse{} |
| 1171 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) |
| 1172 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) |
| 1173 | legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) |
| 1174 | newFetchResponse := &FetchResponse{Version: 5} |
| 1175 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) |
| 1176 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) |
| 1177 | newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) |
| 1178 | newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11) |
| 1179 | newFetchResponse.SetLastStableOffset("my_topic", 0, 11) |
| 1180 | for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { |
| 1181 | cfg := NewTestConfig() |
| 1182 | if fetchResponse1.Version >= 4 { |
| 1183 | cfg.Version = V0_11_0_0 |
| 1184 | } |
| 1185 | |
| 1186 | broker0 := NewMockBroker(t, 0) |
| 1187 | fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version} |
| 1188 | fetchResponse2.AddError("my_topic", 0, ErrNoError) |
| 1189 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 1190 | "MetadataRequest": NewMockMetadataResponse(t). |
| 1191 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 1192 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 1193 | "OffsetRequest": NewMockOffsetResponse(t). |
| 1194 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 1195 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 1196 | "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), |
| 1197 | }) |
| 1198 | |
| 1199 | master, err := NewConsumer([]string{broker0.Addr()}, cfg) |
| 1200 | if err != nil { |
| 1201 | t.Fatal(err) |
| 1202 | } |
| 1203 | |
| 1204 | // When |
| 1205 | consumer, err := master.ConsumePartition("my_topic", 0, 3) |
| 1206 | if err != nil { |
| 1207 | t.Fatal(err) |
| 1208 | } |
| 1209 | |
| 1210 | // Then: messages with offsets 1 and 2 are not returned even though they |
| 1211 | // are present in the response. |
| 1212 | assertMessageOffset(t, <-consumer.Messages(), 5) |
| 1213 | assertMessageOffset(t, <-consumer.Messages(), 7) |
| 1214 | assertMessageOffset(t, <-consumer.Messages(), 11) |
| 1215 | |
| 1216 | safeClose(t, consumer) |
| 1217 | safeClose(t, master) |
| 1218 | broker0.Close() |
| 1219 | } |
| 1220 | } |
| 1221 | |
| 1222 | // If leadership for a partition is changing then consumer resolves the new |
| 1223 | // leader and switches to it. |
nothing calls this directly
no test coverage detected