MCPcopy
hub / github.com/grafana/tempo / consume

Method consume

modules/livestore/live_store.go:625–688  ·  view source on GitHub ↗
(ctx context.Context, rs recordIter, now time.Time)

Source from the content-addressed store, hash-verified

623}
624
625func (s *LiveStore) consume(ctx context.Context, rs recordIter, now time.Time) (*kadm.Offset, error) {
626 defer s.decoder.Reset()
627 ctx, span := tracer.Start(ctx, "LiveStore.consume")
628 defer span.End()
629
630 recordCount := 0
631 var lastRecord *kgo.Record
632
633 cutoff := now.Add(-s.cfg.CompleteBlockTimeout)
634 // Process records by tenant
635 for !rs.Done() {
636 record := rs.Next()
637 tenant := string(record.Key)
638
639 // Track partition lag in seconds
640 lag := now.Sub(record.Timestamp)
641 ingest.SetPartitionLagSeconds(s.cfg.IngestConfig.Kafka.ConsumerGroup, record.Partition, lag)
642
643 if record.Timestamp.Before(cutoff) {
644 metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonTooOld).Inc()
645 lastRecord = record
646 continue
647 }
648
649 s.decoder.Reset()
650 pushReq, err := s.decoder.Decode(record.Value)
651 if err != nil {
652 metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonDecodingFailed).Inc()
653 level.Error(s.logger).Log("msg", "failed to decoded record", "tenant", tenant, "err", err)
654 span.RecordError(err)
655 lastRecord = record
656 continue
657 }
658
659 // Get or create tenant instance
660 inst, err := s.getOrCreateInstance(tenant)
661 if err != nil {
662 metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonInstanceNotFound).Inc()
663 level.Error(s.logger).Log("msg", "failed to get instance for tenant", "tenant", tenant, "err", err)
664 span.RecordError(err)
665 lastRecord = record
666 continue
667 }
668
669 // Push data to tenant instance
670 inst.pushBytes(ctx, record.Timestamp, pushReq)
671
672 metricRecordsProcessed.WithLabelValues(tenant).Inc()
673 recordCount++
674 lastRecord = record
675 }
676
677 span.SetAttributes(attribute.Int("records_count", recordCount))
678
679 if lastRecord == nil {
680 return nil, nil
681 }
682

Calls 14

getOrCreateInstanceMethod · 0.95
SetPartitionLagSecondsFunction · 0.92
IntMethod · 0.80
ResetMethod · 0.65
StartMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
NextMethod · 0.65
IncMethod · 0.65
DecodeMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65