MCPcopy
hub / github.com/segmentio/kafka-go / TestCommitLoopImmediateFlushOnGenerationEnd

Function TestCommitLoopImmediateFlushOnGenerationEnd

reader_test.go:1273–1319  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1271}
1272
1273func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
1274 t.Parallel()
1275 var committedOffset int64
1276 var commitCount int
1277 gen := &Generation{
1278 conn: mockCoordinator{
1279 offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
1280 commitCount++
1281 committedOffset = r.Topics[0].Partitions[0].Offset
1282 return offsetCommitResponseV2{}, nil
1283 },
1284 },
1285 done: make(chan struct{}),
1286 log: func(func(Logger)) {},
1287 logError: func(func(Logger)) {},
1288 joined: make(chan struct{}),
1289 }
1290
1291 // initialize commits so that the commitLoopImmediate select statement blocks
1292 r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)}
1293
1294 for i := 0; i < 100; i++ {
1295 cr := commitRequest{
1296 commits: []commit{{
1297 topic: "topic",
1298 partition: 0,
1299 offset: int64(i) + 1,
1300 }},
1301 errch: make(chan<- error, 1),
1302 }
1303 r.commits <- cr
1304 }
1305
1306 gen.Start(func(ctx context.Context) {
1307 r.commitLoopImmediate(ctx, gen)
1308 })
1309
1310 gen.close()
1311
1312 if committedOffset != 100 {
1313 t.Fatalf("expected commited offset to be 100 but got %d", committedOffset)
1314 }
1315
1316 if commitCount >= 100 {
1317 t.Fatalf("expected a single final commit on generation end got %d", commitCount)
1318 }
1319}
1320
1321func TestCommitOffsetsWithRetry(t *testing.T) {
1322 offsets := offsetStash{"topic": {0: 0}}

Callers

nothing calls this directly

Calls 3

StartMethod · 0.95
commitLoopImmediateMethod · 0.95
closeMethod · 0.95

Tested by

no test coverage detected