(t *testing.T)
| 367 | } |
| 368 | |
| 369 | func TestFuncAdminListConsumerGroupOffsets(t *testing.T) { |
| 370 | checkKafkaVersion(t, "0.8.2.0") |
| 371 | setupFunctionalTest(t) |
| 372 | defer teardownFunctionalTest(t) |
| 373 | |
| 374 | config := NewFunctionalTestConfig() |
| 375 | config.ClientID = t.Name() |
| 376 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 377 | defer safeClose(t, client) |
| 378 | if err != nil { |
| 379 | t.Fatal(err) |
| 380 | } |
| 381 | |
| 382 | group := testFuncConsumerGroupID(t) |
| 383 | consumerGroup, err := NewConsumerGroupFromClient(group, client) |
| 384 | if err != nil { |
| 385 | t.Fatal(err) |
| 386 | } |
| 387 | defer safeClose(t, consumerGroup) |
| 388 | |
| 389 | offsetMgr, _ := NewOffsetManagerFromClient(group, client) |
| 390 | defer safeClose(t, offsetMgr) |
| 391 | markOffset(t, offsetMgr, "test.4", 0, 2) |
| 392 | offsetMgr.Commit() |
| 393 | |
| 394 | coordinator, err := client.Coordinator(group) |
| 395 | if err != nil { |
| 396 | t.Fatal(err) |
| 397 | } |
| 398 | |
| 399 | t.Logf("coordinator broker %d", coordinator.id) |
| 400 | |
| 401 | adminClient, err := NewClusterAdminFromClient(client) |
| 402 | if err != nil { |
| 403 | t.Fatal(err) |
| 404 | } |
| 405 | { |
| 406 | resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}}) |
| 407 | if err != nil { |
| 408 | t.Fatal(err) |
| 409 | } |
| 410 | t.Log(spew.Sdump(resp)) |
| 411 | } |
| 412 | |
| 413 | brokerID := coordinator.id |
| 414 | t.Cleanup( |
| 415 | func() { |
| 416 | if err := startDockerTestBroker(context.Background(), brokerID); err != nil { |
| 417 | t.Fatal(err) |
| 418 | } |
| 419 | }, |
| 420 | ) |
| 421 | if err := stopDockerTestBroker(context.Background(), brokerID); err != nil { |
| 422 | t.Fatal(err) |
| 423 | } |
| 424 | |
| 425 | { |
| 426 | resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}}) |
nothing calls this directly
no test coverage detected