(t *testing.T)
| 1951 | } |
| 1952 | |
| 1953 | func TestListConsumerGroupOffsetsBatch(t *testing.T) { |
| 1954 | const ( |
| 1955 | topic = "my-topic" |
| 1956 | groupA = "group-a" |
| 1957 | groupB = "group-b" |
| 1958 | expectedOffsetA = int64(7) |
| 1959 | expectedOffsetB = int64(13) |
| 1960 | ) |
| 1961 | bothGroups := map[string]map[string][]int32{ |
| 1962 | groupA: {topic: {0}}, |
| 1963 | groupB: {topic: {0}}, |
| 1964 | } |
| 1965 | |
| 1966 | // setup wires a single mock broker as the coordinator for every named group |
| 1967 | // and serves the given OffsetFetchRequest handler. Returns a v3.0 admin. |
| 1968 | setup := func(t *testing.T, offsetFetch MockResponse, groups ...string) ClusterAdmin { |
| 1969 | t.Helper() |
| 1970 | broker := newMockBroker(t, 1) |
| 1971 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 1972 | "OffsetFetchRequest": offsetFetch, |
| 1973 | "MetadataRequest": mockMetadataFor(t, broker), |
| 1974 | "FindCoordinatorRequest": mockGroupCoordinators(t, broker, groups...), |
| 1975 | }) |
| 1976 | return newTestAdminAt(t, V3_0_0_0, broker) |
| 1977 | } |
| 1978 | |
| 1979 | // groupBlock builds a v8 response group entry with a single block. |
| 1980 | groupBlock := func(group string, offset int64) OffsetFetchResponseGroup { |
| 1981 | return OffsetFetchResponseGroup{ |
| 1982 | GroupId: group, |
| 1983 | Blocks: map[string]map[int32]*OffsetFetchResponseBlock{topic: {0: {Offset: offset}}}, |
| 1984 | } |
| 1985 | } |
| 1986 | |
| 1987 | t.Run("fetches offsets for multiple groups", func(t *testing.T) { |
| 1988 | admin := setup(t, NewMockOffsetFetchResponse(t). |
| 1989 | SetOffset(groupA, topic, 0, expectedOffsetA, "", ErrNoError). |
| 1990 | SetOffset(groupB, topic, 0, expectedOffsetB, "", ErrNoError). |
| 1991 | SetError(ErrNoError), groupA, groupB) |
| 1992 | |
| 1993 | result, err := admin.ListConsumerGroupOffsetsBatch(bothGroups) |
| 1994 | require.NoError(t, err) |
| 1995 | assertGroupOffset(t, result, groupA, topic, 0, expectedOffsetA) |
| 1996 | assertGroupOffset(t, result, groupB, topic, 0, expectedOffsetB) |
| 1997 | }) |
| 1998 | |
| 1999 | t.Run("nil partitions fetches all topics for the group", func(t *testing.T) { |
| 2000 | const otherTopic = "other-topic" |
| 2001 | admin := setup(t, NewMockOffsetFetchResponse(t). |
| 2002 | SetOffset(groupA, topic, 0, expectedOffsetA, "", ErrNoError). |
| 2003 | SetOffset(groupA, otherTopic, 0, expectedOffsetB, "", ErrNoError). |
| 2004 | SetError(ErrNoError), groupA) |
| 2005 | |
| 2006 | result, err := admin.ListConsumerGroupOffsetsBatch(map[string]map[string][]int32{groupA: nil}) |
| 2007 | require.NoError(t, err) |
| 2008 | assertGroupOffset(t, result, groupA, topic, 0, expectedOffsetA) |
| 2009 | assertGroupOffset(t, result, groupA, otherTopic, 0, expectedOffsetB) |
| 2010 | }) |
nothing calls this directly
no test coverage detected