MCPcopy
hub / github.com/grafana/tempo / consume

Method consume

modules/blockbuilder/blockbuilder.go:262–331  ·  view source on GitHub ↗

It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp

(ctx context.Context)

Source from the content-addressed store, hash-verified

260// It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until
261// all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp
262func (b *BlockBuilder) consume(ctx context.Context) (time.Duration, error) {
263 partitions := b.getAssignedPartitions()
264
265 ctx, span := tracer.Start(ctx, "blockbuilder.consume", trace.WithAttributes(attribute.String("active_partitions", formatActivePartitions(partitions))))
266 defer span.End()
267
268 if len(partitions) == 0 {
269 return b.cfg.ConsumeCycleDuration, errNoPartitionsAssigned
270 }
271
272 level.Info(b.logger).Log("msg", "starting consume cycle", "active_partitions", formatActivePartitions(partitions))
273 defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())
274
275 // Clear all previous remnants
276 err := b.wal.Clear()
277 if err != nil {
278 return 0, err
279 }
280
281 ps, err := b.fetchPartitions(ctx, partitions)
282 if err != nil {
283 return 0, err
284 }
285
286 // First iteration over all the assigned partitions to get their current lag in time
287 for i, p := range ps {
288 if !p.hasRecords() { // No records, skip for the first iteration
289 // We treat the partition as updated through now,
290 // and will check it again after ConsumeCycleDuration has elapsed
291 ps[i].lastRecordTs = time.Now()
292 ps[i].commitOffset = 0 // always start at beginning
293 level.Info(b.logger).Log("msg", "partition has no records", "partition", p.partition)
294 continue
295 }
296 lastRecordTs, commitOffset, err := b.consumePartition(ctx, p)
297 if err != nil {
298 return 0, err
299 }
300 ps[i].lastRecordTs = lastRecordTs
301 ps[i].commitOffset = commitOffset
302 }
303
304 // Iterate over the laggiest partition until the lag is less than the cycle duration or none of the partitions has records
305 for {
306 sort.Slice(ps, func(i, j int) bool {
307 return ps[i].lastRecordTs.Before(ps[j].lastRecordTs)
308 })
309
310 laggiestPartition := ps[0]
311 if laggiestPartition.lastRecordTs.IsZero() {
312 return b.cfg.ConsumeCycleDuration, errors.New("partition has no last record timestamp")
313 }
314
315 lagTime := time.Since(laggiestPartition.lastRecordTs)
316 if lagTime < b.cfg.ConsumeCycleDuration {
317 return b.cfg.ConsumeCycleDuration - lagTime, nil
318 }
319 // If we don't know exact offset, we need to start over on next cycle to fetch offsets again.

Calls 11

getAssignedPartitionsMethod · 0.95
fetchPartitionsMethod · 0.95
consumePartitionMethod · 0.95
formatActivePartitionsFunction · 0.85
hasRecordsMethod · 0.80
StartMethod · 0.65
LogMethod · 0.65
ObserveMethod · 0.65
NowMethod · 0.65
ClearMethod · 0.65
StringMethod · 0.45