(t *testing.T)
| 439 | } |
| 440 | |
| 441 | func TestFuncAdminListConsumerGroupOffsetsBatch(t *testing.T) { |
| 442 | checkKafkaVersion(t, "3.0.0.0") |
| 443 | setupFunctionalTest(t) |
| 444 | defer teardownFunctionalTest(t) |
| 445 | |
| 446 | config := NewFunctionalTestConfig() |
| 447 | config.ClientID = t.Name() |
| 448 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 449 | require.NoError(t, err) |
| 450 | defer safeClose(t, client) |
| 451 | |
| 452 | groupA := testFuncConsumerGroupID(t) |
| 453 | groupB := testFuncConsumerGroupID(t) |
| 454 | const ( |
| 455 | topic = "test.4" |
| 456 | partition = int32(0) |
| 457 | offsetGrpA = int64(2) |
| 458 | offsetGrpB = int64(3) |
| 459 | ) |
| 460 | |
| 461 | for _, c := range []struct { |
| 462 | group string |
| 463 | offset int64 |
| 464 | }{{groupA, offsetGrpA}, {groupB, offsetGrpB}} { |
| 465 | offsetMgr, err := NewOffsetManagerFromClient(c.group, client) |
| 466 | require.NoError(t, err) |
| 467 | markOffset(t, offsetMgr, topic, partition, c.offset) |
| 468 | offsetMgr.Commit() |
| 469 | safeClose(t, offsetMgr) |
| 470 | } |
| 471 | |
| 472 | adminClient, err := NewClusterAdminFromClient(client) |
| 473 | require.NoError(t, err) |
| 474 | |
| 475 | result, err := adminClient.ListConsumerGroupOffsetsBatch(map[string]map[string][]int32{ |
| 476 | groupA: {topic: {partition}}, |
| 477 | groupB: {topic: {partition}}, |
| 478 | }) |
| 479 | require.NoError(t, err) |
| 480 | |
| 481 | for _, c := range []struct { |
| 482 | group string |
| 483 | offset int64 |
| 484 | }{{groupA, offsetGrpA}, {groupB, offsetGrpB}} { |
| 485 | require.Contains(t, result, c.group) |
| 486 | require.Equal(t, ErrNoError, result[c.group].Err) |
| 487 | block := result[c.group].GetBlock(topic, partition) |
| 488 | require.NotNil(t, block, "missing block for %s/%s/%d", c.group, topic, partition) |
| 489 | require.Equal(t, ErrNoError, block.Err) |
| 490 | require.Equal(t, c.offset, block.Offset) |
| 491 | } |
| 492 | } |
| 493 | |
| 494 | func TestFuncAdminListOffsets(t *testing.T) { |
| 495 | t.Parallel() |
nothing calls this directly
no test coverage detected