| 623 | } |
| 624 | |
| 625 | func (s *LiveStore) consume(ctx context.Context, rs recordIter, now time.Time) (*kadm.Offset, error) { |
| 626 | defer s.decoder.Reset() |
| 627 | ctx, span := tracer.Start(ctx, "LiveStore.consume") |
| 628 | defer span.End() |
| 629 | |
| 630 | recordCount := 0 |
| 631 | var lastRecord *kgo.Record |
| 632 | |
| 633 | cutoff := now.Add(-s.cfg.CompleteBlockTimeout) |
| 634 | // Process records by tenant |
| 635 | for !rs.Done() { |
| 636 | record := rs.Next() |
| 637 | tenant := string(record.Key) |
| 638 | |
| 639 | // Track partition lag in seconds |
| 640 | lag := now.Sub(record.Timestamp) |
| 641 | ingest.SetPartitionLagSeconds(s.cfg.IngestConfig.Kafka.ConsumerGroup, record.Partition, lag) |
| 642 | |
| 643 | if record.Timestamp.Before(cutoff) { |
| 644 | metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonTooOld).Inc() |
| 645 | lastRecord = record |
| 646 | continue |
| 647 | } |
| 648 | |
| 649 | s.decoder.Reset() |
| 650 | pushReq, err := s.decoder.Decode(record.Value) |
| 651 | if err != nil { |
| 652 | metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonDecodingFailed).Inc() |
| 653 | level.Error(s.logger).Log("msg", "failed to decoded record", "tenant", tenant, "err", err) |
| 654 | span.RecordError(err) |
| 655 | lastRecord = record |
| 656 | continue |
| 657 | } |
| 658 | |
| 659 | // Get or create tenant instance |
| 660 | inst, err := s.getOrCreateInstance(tenant) |
| 661 | if err != nil { |
| 662 | metricRecordsDropped.WithLabelValues(tenant, droppedRecordReasonInstanceNotFound).Inc() |
| 663 | level.Error(s.logger).Log("msg", "failed to get instance for tenant", "tenant", tenant, "err", err) |
| 664 | span.RecordError(err) |
| 665 | lastRecord = record |
| 666 | continue |
| 667 | } |
| 668 | |
| 669 | // Push data to tenant instance |
| 670 | inst.pushBytes(ctx, record.Timestamp, pushReq) |
| 671 | |
| 672 | metricRecordsProcessed.WithLabelValues(tenant).Inc() |
| 673 | recordCount++ |
| 674 | lastRecord = record |
| 675 | } |
| 676 | |
| 677 | span.SetAttributes(attribute.Int("records_count", recordCount)) |
| 678 | |
| 679 | if lastRecord == nil { |
| 680 | return nil, nil |
| 681 | } |
| 682 | |