MCPcopy
hub / github.com/segmentio/kafka-go / TestCommitOffsetsWithRetry

Function TestCommitOffsetsWithRetry

reader_test.go:1321–1371  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1319}
1320
1321func TestCommitOffsetsWithRetry(t *testing.T) {
1322 offsets := offsetStash{"topic": {0: 0}}
1323
1324 tests := map[string]struct {
1325 Fails int
1326 Invocations int
1327 HasError bool
1328 }{
1329 "happy path": {
1330 Invocations: 1,
1331 },
1332 "1 retry": {
1333 Fails: 1,
1334 Invocations: 2,
1335 },
1336 "out of retries": {
1337 Fails: defaultCommitRetries + 1,
1338 Invocations: defaultCommitRetries,
1339 HasError: true,
1340 },
1341 }
1342
1343 for label, test := range tests {
1344 t.Run(label, func(t *testing.T) {
1345 count := 0
1346 gen := &Generation{
1347 conn: mockCoordinator{
1348 offsetCommitFunc: func(offsetCommitRequestV2) (offsetCommitResponseV2, error) {
1349 count++
1350 if count <= test.Fails {
1351 return offsetCommitResponseV2{}, io.EOF
1352 }
1353 return offsetCommitResponseV2{}, nil
1354 },
1355 },
1356 done: make(chan struct{}),
1357 log: func(func(Logger)) {},
1358 logError: func(func(Logger)) {},
1359 }
1360
1361 r := &Reader{stctx: context.Background()}
1362 err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
1363 switch {
1364 case test.HasError && err == nil:
1365 t.Error("bad err: expected not nil; got nil")
1366 case !test.HasError && err != nil:
1367 t.Errorf("bad err: expected nil; got %v", err)
1368 }
1369 })
1370 }
1371}
1372
1373// Test that a reader won't continually rebalance when there are more consumers
1374// than partitions in a group.

Callers

nothing calls this directly

Calls 2

ErrorMethod · 0.45

Tested by

no test coverage detected