(t *testing.T)
| 1271 | } |
| 1272 | |
| 1273 | func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { |
| 1274 | t.Parallel() |
| 1275 | var committedOffset int64 |
| 1276 | var commitCount int |
| 1277 | gen := &Generation{ |
| 1278 | conn: mockCoordinator{ |
| 1279 | offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { |
| 1280 | commitCount++ |
| 1281 | committedOffset = r.Topics[0].Partitions[0].Offset |
| 1282 | return offsetCommitResponseV2{}, nil |
| 1283 | }, |
| 1284 | }, |
| 1285 | done: make(chan struct{}), |
| 1286 | log: func(func(Logger)) {}, |
| 1287 | logError: func(func(Logger)) {}, |
| 1288 | joined: make(chan struct{}), |
| 1289 | } |
| 1290 | |
| 1291 | // initialize commits so that the commitLoopImmediate select statement blocks |
| 1292 | r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)} |
| 1293 | |
| 1294 | for i := 0; i < 100; i++ { |
| 1295 | cr := commitRequest{ |
| 1296 | commits: []commit{{ |
| 1297 | topic: "topic", |
| 1298 | partition: 0, |
| 1299 | offset: int64(i) + 1, |
| 1300 | }}, |
| 1301 | errch: make(chan<- error, 1), |
| 1302 | } |
| 1303 | r.commits <- cr |
| 1304 | } |
| 1305 | |
| 1306 | gen.Start(func(ctx context.Context) { |
| 1307 | r.commitLoopImmediate(ctx, gen) |
| 1308 | }) |
| 1309 | |
| 1310 | gen.close() |
| 1311 | |
| 1312 | if committedOffset != 100 { |
| 1313 | t.Fatalf("expected commited offset to be 100 but got %d", committedOffset) |
| 1314 | } |
| 1315 | |
| 1316 | if commitCount >= 100 { |
| 1317 | t.Fatalf("expected a single final commit on generation end got %d", commitCount) |
| 1318 | } |
| 1319 | } |
| 1320 | |
| 1321 | func TestCommitOffsetsWithRetry(t *testing.T) { |
| 1322 | offsets := offsetStash{"topic": {0: 0}} |
nothing calls this directly
no test coverage detected