commitLoopImmediate handles each commit synchronously.
(ctx context.Context, gen *Generation)
| 192 | |
| 193 | // commitLoopImmediate handles each commit synchronously. |
| 194 | func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) { |
| 195 | offsets := offsetStash{} |
| 196 | |
| 197 | for { |
| 198 | select { |
| 199 | case <-ctx.Done(): |
| 200 | // drain the commit channel and prepare a single, final commit. |
| 201 | // the commit will combine any outstanding requests and the result |
| 202 | // will be sent back to all the callers of CommitMessages so that |
| 203 | // they can return. |
| 204 | var errchs []chan<- error |
| 205 | for hasCommits := true; hasCommits; { |
| 206 | select { |
| 207 | case req := <-r.commits: |
| 208 | offsets.merge(req.commits) |
| 209 | errchs = append(errchs, req.errch) |
| 210 | default: |
| 211 | hasCommits = false |
| 212 | } |
| 213 | } |
| 214 | err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) |
| 215 | for _, errch := range errchs { |
| 216 | // NOTE : this will be a buffered channel and will not block. |
| 217 | errch <- err |
| 218 | } |
| 219 | return |
| 220 | |
| 221 | case req := <-r.commits: |
| 222 | offsets.merge(req.commits) |
| 223 | req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) |
| 224 | offsets.reset() |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | // commitLoopInterval handles each commit asynchronously with a period defined |
| 230 | // by ReaderConfig.CommitInterval. |