commitOffsetsWithRetry attempts to commit the specified offsets and retries up to the specified number of times.
(gen *Generation, offsetStash offsetStash, retries int)
| 145 | // commitOffsetsWithRetry attempts to commit the specified offsets and retries |
| 146 | // up to the specified number of times. |
| 147 | func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) { |
| 148 | const ( |
| 149 | backoffDelayMin = 100 * time.Millisecond |
| 150 | backoffDelayMax = 5 * time.Second |
| 151 | ) |
| 152 | |
| 153 | for attempt := 0; attempt < retries; attempt++ { |
| 154 | if attempt != 0 { |
| 155 | if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) { |
| 156 | return |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | if err = gen.CommitOffsets(offsetStash); err == nil { |
| 161 | return |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | return // err will not be nil |
| 166 | } |
| 167 | |
| 168 | // offsetStash holds offsets by topic => partition => offset. |
| 169 | type offsetStash map[string]map[int]int64 |