MCPcopy
hub / github.com/IBM/sarama / TestFuncConsumerGroupOffsetDeletion

Function TestFuncConsumerGroupOffsetDeletion

functional_consumer_group_test.go:243–299  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

241}
242
243func TestFuncConsumerGroupOffsetDeletion(t *testing.T) {
244 t.Parallel()
245 checkKafkaVersion(t, "2.4.0")
246 setupFunctionalTest(t)
247 defer teardownFunctionalTest(t)
248 config := NewFunctionalTestConfig()
249 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
250 defer safeClose(t, client)
251 if err != nil {
252 t.Fatal(err)
253 }
254
255 // create a consumer group with offsets on
256 // - topic test.1 partition 0
257 // - topic test.4 partition 0
258 groupID := testFuncConsumerGroupID(t)
259 consumerGroup, err := NewConsumerGroupFromClient(groupID, client)
260 if err != nil {
261 t.Fatal(err)
262 }
263 defer safeClose(t, consumerGroup)
264
265 offsetMgr, _ := NewOffsetManagerFromClient(groupID, client)
266 defer safeClose(t, offsetMgr)
267 markOffset(t, offsetMgr, "test.1", 0, 1)
268 markOffset(t, offsetMgr, "test.4", 0, 2)
269 offsetMgr.Commit()
270
271 admin, err := NewClusterAdminFromClient(client)
272 if err != nil {
273 t.Fatal(err)
274 }
275 offsetFetch, err := admin.ListConsumerGroupOffsets(groupID, nil)
276 if err != nil {
277 t.Fatal(err)
278 }
279 if len(offsetFetch.Blocks) != 2 {
280 t.Fatal("Expected offsets on two topics. Found offsets on ", len(offsetFetch.Blocks), "topics.")
281 }
282
283 // Delete offset for partition topic test.4 partition 0
284 err = admin.DeleteConsumerGroupOffset(groupID, "test.4", 0)
285 if err != nil {
286 t.Fatal(err)
287 }
288
289 offsetFetch, err = admin.ListConsumerGroupOffsets(groupID, nil)
290 if err != nil {
291 t.Fatal(err)
292 }
293 if len(offsetFetch.Blocks) != 1 {
294 t.Fatal("Expected offsets on one topic. Found offsets on ", len(offsetFetch.Blocks), "topics.")
295 }
296 if offsetFetch.Blocks["test.4"] != nil {
297 t.Fatal("Offset still exists for topic 'topic.4'. It should have been deleted.")
298 }
299}
300

Callers

nothing calls this directly

Calls 15

checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
testFuncConsumerGroupIDFunction · 0.85
markOffsetFunction · 0.85
FatalMethod · 0.80
NewClientFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected