MCPcopy
hub / github.com/segmentio/kafka-go / commitLoopImmediate

Method commitLoopImmediate

reader.go:194–227  ·  view source on GitHub ↗

commitLoopImmediate handles each commit synchronously.

(ctx context.Context, gen *Generation)

Source from the content-addressed store, hash-verified

192
193// commitLoopImmediate handles each commit synchronously.
194func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
195 offsets := offsetStash{}
196
197 for {
198 select {
199 case <-ctx.Done():
200 // drain the commit channel and prepare a single, final commit.
201 // the commit will combine any outstanding requests and the result
202 // will be sent back to all the callers of CommitMessages so that
203 // they can return.
204 var errchs []chan<- error
205 for hasCommits := true; hasCommits; {
206 select {
207 case req := <-r.commits:
208 offsets.merge(req.commits)
209 errchs = append(errchs, req.errch)
210 default:
211 hasCommits = false
212 }
213 }
214 err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
215 for _, errch := range errchs {
216 // NOTE : this will be a buffered channel and will not block.
217 errch <- err
218 }
219 return
220
221 case req := <-r.commits:
222 offsets.merge(req.commits)
223 req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
224 offsets.reset()
225 }
226 }
227}
228
229// commitLoopInterval handles each commit asynchronously with a period defined
230// by ReaderConfig.CommitInterval.

Callers 2

commitLoopMethod · 0.95

Calls 4

mergeMethod · 0.95
resetMethod · 0.95
DoneMethod · 0.80