MCPcopy
hub / github.com/IBM/sarama / TestAlterConsumerGroupOffsets

Function TestAlterConsumerGroupOffsets

admin_test.go:2303–2352  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2301}
2302
2303func TestAlterConsumerGroupOffsets(t *testing.T) {
2304 const (
2305 group = "my-group"
2306 topic = "my-topic"
2307 partition = int32(0)
2308 )
2309 offsets := map[string]map[int32]OffsetAndMetadata{
2310 topic: {partition: {Offset: 100, LeaderEpoch: -1}},
2311 }
2312
2313 t.Run("commits offsets via the group coordinator", func(t *testing.T) {
2314 broker := newMockBroker(t, 1)
2315 broker.SetHandlerByMap(map[string]MockResponse{
2316 "OffsetCommitRequest": NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNoError),
2317 "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, broker),
2318 "MetadataRequest": mockMetadataFor(t, broker),
2319 })
2320
2321 response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, offsets, nil)
2322 require.NoError(t, err)
2323 assert.Equal(t, ErrNoError, response.Errors[topic][partition])
2324 })
2325
2326 t.Run("returns ConfigurationError when no offsets provided", func(t *testing.T) {
2327 broker := newMockBroker(t, 1)
2328 broker.SetHandlerByMap(map[string]MockResponse{"MetadataRequest": mockMetadataFor(t, broker)})
2329
2330 response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, nil, nil)
2331 assert.Nil(t, response)
2332
2333 var cfgErr ConfigurationError
2334 require.ErrorAs(t, err, &cfgErr)
2335 })
2336
2337 t.Run("retries on per-partition NOT_COORDINATOR", func(t *testing.T) {
2338 broker := newMockBroker(t, 1)
2339 broker.SetHandlerByMap(map[string]MockResponse{
2340 "OffsetCommitRequest": NewMockSequence(
2341 NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNotCoordinatorForConsumer),
2342 NewMockOffsetCommitResponse(t).SetError(group, topic, partition, ErrNoError),
2343 ),
2344 "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, broker),
2345 "MetadataRequest": mockMetadataFor(t, broker),
2346 })
2347
2348 response, err := newTestAdmin(t, broker).AlterConsumerGroupOffsets(group, offsets, nil)
2349 require.NoError(t, err)
2350 assert.Equal(t, ErrNoError, response.Errors[topic][partition])
2351 })
2352}
2353
2354func TestDeleteConsumerGroup(t *testing.T) {
2355 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 11

newMockBrokerFunction · 0.85
mockMetadataForFunction · 0.85
newTestAdminFunction · 0.85
NewMockSequenceFunction · 0.85
RunMethod · 0.80
SetHandlerByMapMethod · 0.80
SetErrorMethod · 0.45
SetCoordinatorMethod · 0.45

Tested by

no test coverage detected