(t *testing.T)
| 2301 | } |
| 2302 | |
| 2303 | func TestAlterConsumerGroupOffsets(t *testing.T) { |
| 2304 | const ( |
| 2305 | group = "my-group" |
| 2306 | topic = "my-topic" |
| 2307 | partition = int32(0) |
| 2308 | ) |
| 2309 | offsets := map[string]map[int32]OffsetAndMetadata{ |
| 2310 | topic: {partition: {Offset: 100, LeaderEpoch: -1}}, |
| 2311 | } |
| 2312 | |
| 2313 | t.Run("commits offsets via the group coordinator", func(t *testing.T) { |
| 2314 | broker := newMockBroker(t, 1) |
| 2315 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 2316 | "OffsetCommitRequest": NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNoError), |
| 2317 | "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, broker), |
| 2318 | "MetadataRequest": mockMetadataFor(t, broker), |
| 2319 | }) |
| 2320 | |
| 2321 | response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, offsets, nil) |
| 2322 | require.NoError(t, err) |
| 2323 | assert.Equal(t, ErrNoError, response.Errors[topic][partition]) |
| 2324 | }) |
| 2325 | |
| 2326 | t.Run("returns ConfigurationError when no offsets provided", func(t *testing.T) { |
| 2327 | broker := newMockBroker(t, 1) |
| 2328 | broker.SetHandlerByMap(map[string]MockResponse{"MetadataRequest": mockMetadataFor(t, broker)}) |
| 2329 | |
| 2330 | response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, nil, nil) |
| 2331 | assert.Nil(t, response) |
| 2332 | |
| 2333 | var cfgErr ConfigurationError |
| 2334 | require.ErrorAs(t, err, &cfgErr) |
| 2335 | }) |
| 2336 | |
| 2337 | t.Run("retries on per-partition NOT_COORDINATOR", func(t *testing.T) { |
| 2338 | broker := newMockBroker(t, 1) |
| 2339 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 2340 | "OffsetCommitRequest": NewMockSequence( |
| 2341 | NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNotCoordinatorForConsumer), |
| 2342 | NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNoError), |
| 2343 | ), |
| 2344 | "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, broker), |
| 2345 | "MetadataRequest": mockMetadataFor(t, broker), |
| 2346 | }) |
| 2347 | |
| 2348 | response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, offsets, nil) |
| 2349 | require.NoError(t, err) |
| 2350 | assert.Equal(t, ErrNoError, response.Errors[topic][partition]) |
| 2351 | }) |
| 2352 | } |
| 2353 | |
| 2354 | func TestDeleteConsumerGroup(t *testing.T) { |
| 2355 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected