MCPcopy
hub / github.com/grafana/tempo / consumePartition

Method consumePartition

modules/blockbuilder/blockbuilder.go:333–503  ·  view source on GitHub ↗
(ctx context.Context, ps partitionState)

Source from the content-addressed store, hash-verified

331}
332
333func (b *BlockBuilder) consumePartition(ctx context.Context, ps partitionState) (lastTs time.Time, commitOffset int64, err error) {
334 ctx, span := tracer.Start(ctx, "blockbuilder.consumePartition",
335 trace.WithAttributes(attribute.Int("partition", int(ps.partition)),
336 attribute.String("last_record_ts", ps.lastRecordTs.String())))
337
338 defer func(t time.Time) {
339 metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(ps.partition))).Observe(time.Since(t).Seconds())
340 span.End()
341 }(time.Now())
342
343 var (
344 dur = b.cfg.ConsumeCycleDuration
345 topic = b.cfg.IngestStorageConfig.Kafka.Topic
346 group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
347 maxBytesPerCycle = b.cfg.MaxBytesPerCycle
348 partLabel = strconv.Itoa(int(ps.partition))
349 consumedBytes uint64
350 startOffset kgo.Offset
351 init bool
352 writer *writer
353 lastRec *kgo.Record
354 end time.Time
355 processedRecords int
356 )
357
358 startOffset = ps.getStartOffset()
359
360 level.Info(b.logger).Log(
361 "msg", "consuming partition",
362 "partition", ps.partition,
363 "commit_offset", ps.commitOffset,
364 "start_offset", startOffset,
365 )
366 // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
367 // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
368 // In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes
369 // from one partition at a time. I.e. when this partition is consumed, we start consuming the next one.
370 b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{
371 topic: {
372 ps.partition: startOffset,
373 },
374 })
375 defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {ps.partition}})
376
377outer:
378 for {
379 fetches := func() kgo.Fetches {
380 defer func(t time.Time) { metricFetchDuration.WithLabelValues(partLabel).Observe(time.Since(t).Seconds()) }(time.Now())
381 ctx2, cancel := context.WithTimeout(ctx, pollTimeout)
382 defer cancel()
383 return b.kafkaClient.PollFetches(ctx2)
384 }()
385 err = fetches.Err()
386 if err != nil {
387 if errors.Is(err, context.DeadlineExceeded) {
388 // No more data
389 break
390 }

Callers 1

consumeMethod · 0.95

Calls 15

pushTracesMethod · 0.95
commitOffsetMethod · 0.95
SetPartitionLagSecondsFunction · 0.92
IntMethod · 0.80
getStartOffsetMethod · 0.80
allowCompactionMethod · 0.80
StartMethod · 0.65
ObserveMethod · 0.65
NowMethod · 0.65
LogMethod · 0.65
IncMethod · 0.65

Tested by

no test coverage detected