MCPcopy
hub / github.com/IBM/sarama / TestFuncAdminListConsumerGroupOffsetsBatch

Function TestFuncAdminListConsumerGroupOffsetsBatch

functional_admin_test.go:441–492  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

439}
440
441func TestFuncAdminListConsumerGroupOffsetsBatch(t *testing.T) {
442 checkKafkaVersion(t, "3.0.0.0")
443 setupFunctionalTest(t)
444 defer teardownFunctionalTest(t)
445
446 config := NewFunctionalTestConfig()
447 config.ClientID = t.Name()
448 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
449 require.NoError(t, err)
450 defer safeClose(t, client)
451
452 groupA := testFuncConsumerGroupID(t)
453 groupB := testFuncConsumerGroupID(t)
454 const (
455 topic = "test.4"
456 partition = int32(0)
457 offsetGrpA = int64(2)
458 offsetGrpB = int64(3)
459 )
460
461 for _, c := range []struct {
462 group string
463 offset int64
464 }{{groupA, offsetGrpA}, {groupB, offsetGrpB}} {
465 offsetMgr, err := NewOffsetManagerFromClient(c.group, client)
466 require.NoError(t, err)
467 markOffset(t, offsetMgr, topic, partition, c.offset)
468 offsetMgr.Commit()
469 safeClose(t, offsetMgr)
470 }
471
472 adminClient, err := NewClusterAdminFromClient(client)
473 require.NoError(t, err)
474
475 result, err := adminClient.ListConsumerGroupOffsetsBatch(map[string]map[string][]int32{
476 groupA: {topic: {partition}},
477 groupB: {topic: {partition}},
478 })
479 require.NoError(t, err)
480
481 for _, c := range []struct {
482 group string
483 offset int64
484 }{{groupA, offsetGrpA}, {groupB, offsetGrpB}} {
485 require.Contains(t, result, c.group)
486 require.Equal(t, ErrNoError, result[c.group].Err)
487 block := result[c.group].GetBlock(topic, partition)
488 require.NotNil(t, block, "missing block for %s/%s/%d", c.group, topic, partition)
489 require.Equal(t, ErrNoError, block.Err)
490 require.Equal(t, c.offset, block.Offset)
491 }
492}
493
494func TestFuncAdminListOffsets(t *testing.T) {
495 t.Parallel()

Callers

nothing calls this directly

Calls 14

checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
testFuncConsumerGroupIDFunction · 0.85
markOffsetFunction · 0.85
NewClientFunction · 0.70
safeCloseFunction · 0.70
NameMethod · 0.65
CommitMethod · 0.65

Tested by

no test coverage detected