MCPcopy
hub / github.com/grafana/tempo / sendToKafka

Method sendToKafka

modules/distributor/distributor.go:592–659  ·  view source on GitHub ↗
(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace, skipMetricsGeneration bool)

Source from the content-addressed store, hash-verified

590}
591
592func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace, skipMetricsGeneration bool) error {
593 marshalledTraces := make([][]byte, len(traces))
594 for i, t := range traces {
595 b, err := proto.Marshal(t.trace)
596 if err != nil {
597 return fmt.Errorf("failed to marshal trace: %w", err)
598 }
599 marshalledTraces[i] = b
600 }
601
602 partitionRing, err := d.partitionRing.PartitionRing().ShuffleShard(userID, d.overrides.IngestionTenantShardSize(userID))
603 if err != nil {
604 return fmt.Errorf("failed to shuffle shard: %w", err)
605 }
606 return ring.DoBatchWithOptions(ctx, ring.Write, ring.NewActivePartitionBatchRing(partitionRing), keys, func(partition ring.InstanceDesc, indexes []int) error {
607 localCtx, cancel := context.WithTimeout(ctx, d.cfg.KafkaConfig.WriteTimeout)
608 defer cancel()
609 localCtx = user.InjectOrgID(localCtx, userID)
610
611 req := &tempopb.PushBytesRequest{
612 Traces: make([]tempopb.PreallocBytes, len(indexes)),
613 Ids: make([][]byte, len(indexes)),
614 SkipMetricsGeneration: skipMetricsGeneration,
615 }
616
617 for i, j := range indexes {
618 req.Traces[i].Slice = marshalledTraces[j][0:]
619 req.Ids[i] = traces[j].id
620 }
621
622 // The partition ID is stored in the ring.InstanceDesc ID.
623 partitionID, err := strconv.ParseInt(partition.Id, 10, 32)
624 if err != nil {
625 return err
626 }
627
628 records, err := ingest.Encode(int32(partitionID), userID, req, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes)
629 if err != nil {
630 return fmt.Errorf("failed to encode PushSpansRequest: %w", err)
631 }
632
633 metricKafkaRecordsPerRequest.Observe(float64(len(records)))
634
635 startTime := time.Now()
636 produceResults := d.kafkaProducer.ProduceSync(localCtx, records)
637 metricKafkaWriteLatency.Observe(time.Since(startTime).Seconds())
638
639 partitionLabel := fmt.Sprintf("partition_%d", partitionID)
640 count := 0
641 sizeBytes := 0
642 for _, result := range produceResults {
643 if result.Err != nil {
644 _ = level.Error(d.logger).Log("msg", "failed to write to kafka", "err", result.Err, "tenant", userID)
645 } else {
646 count++
647 sizeBytes += len(result.Record.Value)
648 }
649 }

Callers 1

pushTracesKafkaMethod · 0.95

Calls 10

EncodeFunction · 0.92
ProduceSyncMethod · 0.80
MarshalMethod · 0.65
ObserveMethod · 0.65
NowMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65
AddMethod · 0.65
PartitionRingMethod · 0.45

Tested by

no test coverage detected