| 223 | } |
| 224 | |
| 225 | func (b *BlockBuilder) running(ctx context.Context) error { |
| 226 | defer close(b.consumeStopped) |
| 227 | for { |
| 228 | // Create a detached context for consume |
| 229 | // This is so that when the parent context is canceled and the block builder is stopping, |
| 230 | // we still finish the current consumption and flush of blocks. That is preferred than |
| 231 | // to starting over after a restart. |
| 232 | consumeCtx, cancel := context.WithCancel(context.Background()) |
| 233 | |
| 234 | waitTime, err := b.consume(consumeCtx) |
| 235 | cancel() // Always cancel the context after consume completes |
| 236 | |
| 237 | if err != nil { |
| 238 | level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err) |
| 239 | } |
| 240 | |
| 241 | // Always check for cancellation before going to next cycle. |
| 242 | // There are cases like when the queue is lagged, that waitTime could be zero. |
| 243 | // In this case it's non-deterministic which select statement will be executed below, |
| 244 | // so we do a specific check here first. |
| 245 | if ctx.Err() != nil { |
| 246 | // Parent context canceled, return |
| 247 | return nil |
| 248 | } |
| 249 | |
| 250 | // Now wait for next cycle or cancellation. |
| 251 | select { |
| 252 | case <-time.After(waitTime): // Continue with next cycle |
| 253 | case <-ctx.Done(): |
| 254 | // Parent context canceled, return |
| 255 | return nil |
| 256 | } |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | // It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until |
| 261 | // all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp |