MCPcopy
hub / github.com/segmentio/kafka-go / commitOffsetsWithRetry

Method commitOffsetsWithRetry

reader.go:147–166  ·  view source on GitHub ↗

commitOffsetsWithRetry attempts to commit the specified offsets and retries up to the specified number of times.

(gen *Generation, offsetStash offsetStash, retries int)

Source from the content-addressed store, hash-verified

145// commitOffsetsWithRetry attempts to commit the specified offsets and retries
146// up to the specified number of times.
147func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
148 const (
149 backoffDelayMin = 100 * time.Millisecond
150 backoffDelayMax = 5 * time.Second
151 )
152
153 for attempt := 0; attempt < retries; attempt++ {
154 if attempt != 0 {
155 if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
156 return
157 }
158 }
159
160 if err = gen.CommitOffsets(offsetStash); err == nil {
161 return
162 }
163 }
164
165 return // err will not be nil
166}
167
168// offsetStash holds offsets by topic => partition => offset.
169type offsetStash map[string]map[int]int64

Callers 3

commitLoopImmediateMethod · 0.95
commitLoopIntervalMethod · 0.95

Calls 3

sleepFunction · 0.85
backoffFunction · 0.85
CommitOffsetsMethod · 0.80

Tested by 1