MCPcopy
hub / github.com/IBM/sarama / TestListConsumerGroupOffsetsBatch

Function TestListConsumerGroupOffsetsBatch

admin_test.go:1953–2126  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1951}
1952
1953func TestListConsumerGroupOffsetsBatch(t *testing.T) {
1954 const (
1955 topic = "my-topic"
1956 groupA = "group-a"
1957 groupB = "group-b"
1958 expectedOffsetA = int64(7)
1959 expectedOffsetB = int64(13)
1960 )
1961 bothGroups := map[string]map[string][]int32{
1962 groupA: {topic: {0}},
1963 groupB: {topic: {0}},
1964 }
1965
1966 // setup wires a single mock broker as the coordinator for every named group
1967 // and serves the given OffsetFetchRequest handler. Returns a v3.0 admin.
1968 setup := func(t *testing.T, offsetFetch MockResponse, groups ...string) ClusterAdmin {
1969 t.Helper()
1970 broker := newMockBroker(t, 1)
1971 broker.SetHandlerByMap(map[string]MockResponse{
1972 "OffsetFetchRequest": offsetFetch,
1973 "MetadataRequest": mockMetadataFor(t, broker),
1974 "FindCoordinatorRequest": mockGroupCoordinators(t, broker, groups...),
1975 })
1976 return newTestAdminAt(t, V3_0_0_0, broker)
1977 }
1978
1979 // groupBlock builds a v8 response group entry with a single block.
1980 groupBlock := func(group string, offset int64) OffsetFetchResponseGroup {
1981 return OffsetFetchResponseGroup{
1982 GroupId: group,
1983 Blocks: map[string]map[int32]*OffsetFetchResponseBlock{topic: {0: {Offset: offset}}},
1984 }
1985 }
1986
1987 t.Run("fetches offsets for multiple groups", func(t *testing.T) {
1988 admin := setup(t, NewMockOffsetFetchResponse(t).
1989 SetOffset(groupA, topic, 0, expectedOffsetA, "", ErrNoError).
1990 SetOffset(groupB, topic, 0, expectedOffsetB, "", ErrNoError).
1991 SetError(ErrNoError), groupA, groupB)
1992
1993 result, err := admin.ListConsumerGroupOffsetsBatch(bothGroups)
1994 require.NoError(t, err)
1995 assertGroupOffset(t, result, groupA, topic, 0, expectedOffsetA)
1996 assertGroupOffset(t, result, groupB, topic, 0, expectedOffsetB)
1997 })
1998
1999 t.Run("nil partitions fetches all topics for the group", func(t *testing.T) {
2000 const otherTopic = "other-topic"
2001 admin := setup(t, NewMockOffsetFetchResponse(t).
2002 SetOffset(groupA, topic, 0, expectedOffsetA, "", ErrNoError).
2003 SetOffset(groupA, otherTopic, 0, expectedOffsetB, "", ErrNoError).
2004 SetError(ErrNoError), groupA)
2005
2006 result, err := admin.ListConsumerGroupOffsetsBatch(map[string]map[string][]int32{groupA: nil})
2007 require.NoError(t, err)
2008 assertGroupOffset(t, result, groupA, topic, 0, expectedOffsetA)
2009 assertGroupOffset(t, result, groupA, otherTopic, 0, expectedOffsetB)
2010 })

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
newMockBrokerFunction · 0.85
mockMetadataForFunction · 0.85
mockGroupCoordinatorsFunction · 0.85
newTestAdminAtFunction · 0.85
assertGroupOffsetFunction · 0.85
NewClusterAdminFunction · 0.85
NewMockSequenceFunction · 0.85
NewMockWrapperFunction · 0.85

Tested by

no test coverage detected