| 187 | } |
| 188 | |
| 189 | func (r *PartitionReader) commitLoop(ctx context.Context) { |
| 190 | if r.commitInterval == 0 { // Sync commits |
| 191 | return |
| 192 | } |
| 193 | |
| 194 | t := time.NewTicker(r.commitInterval) |
| 195 | defer t.Stop() |
| 196 | |
| 197 | var lastCommittedOffset kadm.Offset |
| 198 | |
| 199 | for { |
| 200 | select { |
| 201 | case <-ctx.Done(): |
| 202 | // Commit one last time before shutting down |
| 203 | func() { |
| 204 | // Detach context with a deadline |
| 205 | ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) |
| 206 | defer cancel() |
| 207 | r.commitHighWatermark(ctx, lastCommittedOffset) |
| 208 | }() |
| 209 | return |
| 210 | case <-t.C: |
| 211 | lastCommittedOffset = r.commitHighWatermark(ctx, lastCommittedOffset) |
| 212 | } |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | func collectFetchErrs(fetches kgo.Fetches) (_ error) { |
| 217 | mErr := multierror.New() |