(t *testing.T)
| 324 | } |
| 325 | |
| 326 | func TestFuncAdminListConsumerGroups(t *testing.T) { |
| 327 | t.Parallel() |
| 328 | setupFunctionalTest(t) |
| 329 | defer teardownFunctionalTest(t) |
| 330 | |
| 331 | group1 := testFuncConsumerGroupID(t) |
| 332 | group2 := testFuncConsumerGroupID(t) |
| 333 | |
| 334 | config := NewFunctionalTestConfig() |
| 335 | adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 336 | if err != nil { |
| 337 | t.Fatal(err) |
| 338 | } |
| 339 | defer safeClose(t, adminClient) |
| 340 | |
| 341 | config1 := NewFunctionalTestConfig() |
| 342 | config1.ClientID = "M1" |
| 343 | config1.Consumer.Offsets.Initial = OffsetNewest |
| 344 | m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4") |
| 345 | defer m1.Close() |
| 346 | |
| 347 | config2 := NewFunctionalTestConfig() |
| 348 | config2.ClientID = "M2" |
| 349 | config2.Consumer.Offsets.Initial = OffsetNewest |
| 350 | config2.Consumer.Group.InstanceId = "Instance2" |
| 351 | m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4") |
| 352 | defer m2.Close() |
| 353 | |
| 354 | m1.WaitForState(2) |
| 355 | m2.WaitForState(2) |
| 356 | |
| 357 | res, err := adminClient.ListConsumerGroups() |
| 358 | if err != nil { |
| 359 | t.Fatal(err) |
| 360 | } |
| 361 | assert.GreaterOrEqual(t, len(res), 2) |
| 362 | assert.Contains(t, slices.Collect(maps.Keys(res)), group1) |
| 363 | assert.Contains(t, slices.Collect(maps.Keys(res)), group2) |
| 364 | |
| 365 | m1.AssertCleanShutdown() |
| 366 | m2.AssertCleanShutdown() |
| 367 | } |
| 368 | |
| 369 | func TestFuncAdminListConsumerGroupOffsets(t *testing.T) { |
| 370 | checkKafkaVersion(t, "0.8.2.0") |
nothing calls this directly
no test coverage detected