(t *testing.T)
| 1319 | } |
| 1320 | |
| 1321 | func TestCommitOffsetsWithRetry(t *testing.T) { |
| 1322 | offsets := offsetStash{"topic": {0: 0}} |
| 1323 | |
| 1324 | tests := map[string]struct { |
| 1325 | Fails int |
| 1326 | Invocations int |
| 1327 | HasError bool |
| 1328 | }{ |
| 1329 | "happy path": { |
| 1330 | Invocations: 1, |
| 1331 | }, |
| 1332 | "1 retry": { |
| 1333 | Fails: 1, |
| 1334 | Invocations: 2, |
| 1335 | }, |
| 1336 | "out of retries": { |
| 1337 | Fails: defaultCommitRetries + 1, |
| 1338 | Invocations: defaultCommitRetries, |
| 1339 | HasError: true, |
| 1340 | }, |
| 1341 | } |
| 1342 | |
| 1343 | for label, test := range tests { |
| 1344 | t.Run(label, func(t *testing.T) { |
| 1345 | count := 0 |
| 1346 | gen := &Generation{ |
| 1347 | conn: mockCoordinator{ |
| 1348 | offsetCommitFunc: func(offsetCommitRequestV2) (offsetCommitResponseV2, error) { |
| 1349 | count++ |
| 1350 | if count <= test.Fails { |
| 1351 | return offsetCommitResponseV2{}, io.EOF |
| 1352 | } |
| 1353 | return offsetCommitResponseV2{}, nil |
| 1354 | }, |
| 1355 | }, |
| 1356 | done: make(chan struct{}), |
| 1357 | log: func(func(Logger)) {}, |
| 1358 | logError: func(func(Logger)) {}, |
| 1359 | } |
| 1360 | |
| 1361 | r := &Reader{stctx: context.Background()} |
| 1362 | err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) |
| 1363 | switch { |
| 1364 | case test.HasError && err == nil: |
| 1365 | t.Error("bad err: expected not nil; got nil") |
| 1366 | case !test.HasError && err != nil: |
| 1367 | t.Errorf("bad err: expected nil; got %v", err) |
| 1368 | } |
| 1369 | }) |
| 1370 | } |
| 1371 | } |
| 1372 | |
| 1373 | // Test that a reader won't continually rebalance when there are more consumers |
| 1374 | // than partitions in a group. |
nothing calls this directly
no test coverage detected