commitLoop processes commits off the commit chan.
(ctx context.Context, gen *Generation)
| 270 | |
| 271 | // commitLoop processes commits off the commit chan. |
| 272 | func (r *Reader) commitLoop(ctx context.Context, gen *Generation) { |
| 273 | r.withLogger(func(l Logger) { |
| 274 | l.Printf("started commit for group %s\n", r.config.GroupID) |
| 275 | }) |
| 276 | defer r.withLogger(func(l Logger) { |
| 277 | l.Printf("stopped commit for group %s\n", r.config.GroupID) |
| 278 | }) |
| 279 | |
| 280 | if r.useSyncCommits() { |
| 281 | r.commitLoopImmediate(ctx, gen) |
| 282 | } else { |
| 283 | r.commitLoopInterval(ctx, gen) |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | // run provides the main consumer group management loop. Each iteration performs the |
| 288 | // handshake to join the Reader to the consumer group. |
no test coverage detected