commitLoopInterval handles each commit asynchronously with a period defined by ReaderConfig.CommitInterval.
(ctx context.Context, gen *Generation)
| 229 | // commitLoopInterval handles each commit asynchronously with a period defined |
| 230 | // by ReaderConfig.CommitInterval. |
| 231 | func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { |
| 232 | ticker := time.NewTicker(r.config.CommitInterval) |
| 233 | defer ticker.Stop() |
| 234 | |
| 235 | // the offset stash should not survive rebalances b/c the consumer may |
| 236 | // receive new assignments. |
| 237 | offsets := offsetStash{} |
| 238 | |
| 239 | commit := func() { |
| 240 | if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { |
| 241 | r.withErrorLogger(func(l Logger) { l.Printf("%v", err) }) |
| 242 | } else { |
| 243 | offsets.reset() |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | for { |
| 248 | select { |
| 249 | case <-ctx.Done(): |
| 250 | // drain the commit channel in order to prepare the final commit. |
| 251 | for hasCommits := true; hasCommits; { |
| 252 | select { |
| 253 | case req := <-r.commits: |
| 254 | offsets.merge(req.commits) |
| 255 | default: |
| 256 | hasCommits = false |
| 257 | } |
| 258 | } |
| 259 | commit() |
| 260 | return |
| 261 | |
| 262 | case <-ticker.C: |
| 263 | commit() |
| 264 | |
| 265 | case req := <-r.commits: |
| 266 | offsets.merge(req.commits) |
| 267 | } |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | // commitLoop processes commits off the commit chan. |
| 272 | func (r *Reader) commitLoop(ctx context.Context, gen *Generation) { |
no test coverage detected