It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp
(ctx context.Context)
| 260 | // It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until |
| 261 | // all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp |
| 262 | func (b *BlockBuilder) consume(ctx context.Context) (time.Duration, error) { |
| 263 | partitions := b.getAssignedPartitions() |
| 264 | |
| 265 | ctx, span := tracer.Start(ctx, "blockbuilder.consume", trace.WithAttributes(attribute.String("active_partitions", formatActivePartitions(partitions)))) |
| 266 | defer span.End() |
| 267 | |
| 268 | if len(partitions) == 0 { |
| 269 | return b.cfg.ConsumeCycleDuration, errNoPartitionsAssigned |
| 270 | } |
| 271 | |
| 272 | level.Info(b.logger).Log("msg", "starting consume cycle", "active_partitions", formatActivePartitions(partitions)) |
| 273 | defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) |
| 274 | |
| 275 | // Clear all previous remnants |
| 276 | err := b.wal.Clear() |
| 277 | if err != nil { |
| 278 | return 0, err |
| 279 | } |
| 280 | |
| 281 | ps, err := b.fetchPartitions(ctx, partitions) |
| 282 | if err != nil { |
| 283 | return 0, err |
| 284 | } |
| 285 | |
| 286 | // First iteration over all the assigned partitions to get their current lag in time |
| 287 | for i, p := range ps { |
| 288 | if !p.hasRecords() { // No records, skip for the first iteration |
| 289 | // We treat the partition as updated through now, |
| 290 | // and will check it again after ConsumeCycleDuration has elapsed |
| 291 | ps[i].lastRecordTs = time.Now() |
| 292 | ps[i].commitOffset = 0 // always start at beginning |
| 293 | level.Info(b.logger).Log("msg", "partition has no records", "partition", p.partition) |
| 294 | continue |
| 295 | } |
| 296 | lastRecordTs, commitOffset, err := b.consumePartition(ctx, p) |
| 297 | if err != nil { |
| 298 | return 0, err |
| 299 | } |
| 300 | ps[i].lastRecordTs = lastRecordTs |
| 301 | ps[i].commitOffset = commitOffset |
| 302 | } |
| 303 | |
| 304 | // Iterate over the laggiest partition until the lag is less than the cycle duration or none of the partitions has records |
| 305 | for { |
| 306 | sort.Slice(ps, func(i, j int) bool { |
| 307 | return ps[i].lastRecordTs.Before(ps[j].lastRecordTs) |
| 308 | }) |
| 309 | |
| 310 | laggiestPartition := ps[0] |
| 311 | if laggiestPartition.lastRecordTs.IsZero() { |
| 312 | return b.cfg.ConsumeCycleDuration, errors.New("partition has no last record timestamp") |
| 313 | } |
| 314 | |
| 315 | lagTime := time.Since(laggiestPartition.lastRecordTs) |
| 316 | if lagTime < b.cfg.ConsumeCycleDuration { |
| 317 | return b.cfg.ConsumeCycleDuration - lagTime, nil |
| 318 | } |
| 319 | // If we don't know exact offset, we need to start over on next cycle to fetch offsets again. |