MCPcopy
hub / github.com/segmentio/kafka-go / commitLoop

Method commitLoop

reader.go:272–285  ·  view source on GitHub ↗

commitLoop processes commits off the commit chan.

(ctx context.Context, gen *Generation)

Source from the content-addressed store, hash-verified

270
271// commitLoop processes commits off the commit chan.
272func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
273 r.withLogger(func(l Logger) {
274 l.Printf("started commit for group %s\n", r.config.GroupID)
275 })
276 defer r.withLogger(func(l Logger) {
277 l.Printf("stopped commit for group %s\n", r.config.GroupID)
278 })
279
280 if r.useSyncCommits() {
281 r.commitLoopImmediate(ctx, gen)
282 } else {
283 r.commitLoopInterval(ctx, gen)
284 }
285}
286
287// run provides the main consumer group management loop. Each iteration performs the
288// handshake to join the Reader to the consumer group.

Callers 1

runMethod · 0.95

Calls 5

withLoggerMethod · 0.95
useSyncCommitsMethod · 0.95
commitLoopImmediateMethod · 0.95
commitLoopIntervalMethod · 0.95
PrintfMethod · 0.65

Tested by

no test coverage detected