(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace, skipMetricsGeneration bool)
| 590 | } |
| 591 | |
| 592 | func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace, skipMetricsGeneration bool) error { |
| 593 | marshalledTraces := make([][]byte, len(traces)) |
| 594 | for i, t := range traces { |
| 595 | b, err := proto.Marshal(t.trace) |
| 596 | if err != nil { |
| 597 | return fmt.Errorf("failed to marshal trace: %w", err) |
| 598 | } |
| 599 | marshalledTraces[i] = b |
| 600 | } |
| 601 | |
| 602 | partitionRing, err := d.partitionRing.PartitionRing().ShuffleShard(userID, d.overrides.IngestionTenantShardSize(userID)) |
| 603 | if err != nil { |
| 604 | return fmt.Errorf("failed to shuffle shard: %w", err) |
| 605 | } |
| 606 | return ring.DoBatchWithOptions(ctx, ring.Write, ring.NewActivePartitionBatchRing(partitionRing), keys, func(partition ring.InstanceDesc, indexes []int) error { |
| 607 | localCtx, cancel := context.WithTimeout(ctx, d.cfg.KafkaConfig.WriteTimeout) |
| 608 | defer cancel() |
| 609 | localCtx = user.InjectOrgID(localCtx, userID) |
| 610 | |
| 611 | req := &tempopb.PushBytesRequest{ |
| 612 | Traces: make([]tempopb.PreallocBytes, len(indexes)), |
| 613 | Ids: make([][]byte, len(indexes)), |
| 614 | SkipMetricsGeneration: skipMetricsGeneration, |
| 615 | } |
| 616 | |
| 617 | for i, j := range indexes { |
| 618 | req.Traces[i].Slice = marshalledTraces[j][0:] |
| 619 | req.Ids[i] = traces[j].id |
| 620 | } |
| 621 | |
| 622 | // The partition ID is stored in the ring.InstanceDesc ID. |
| 623 | partitionID, err := strconv.ParseInt(partition.Id, 10, 32) |
| 624 | if err != nil { |
| 625 | return err |
| 626 | } |
| 627 | |
| 628 | records, err := ingest.Encode(int32(partitionID), userID, req, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes) |
| 629 | if err != nil { |
| 630 | return fmt.Errorf("failed to encode PushSpansRequest: %w", err) |
| 631 | } |
| 632 | |
| 633 | metricKafkaRecordsPerRequest.Observe(float64(len(records))) |
| 634 | |
| 635 | startTime := time.Now() |
| 636 | produceResults := d.kafkaProducer.ProduceSync(localCtx, records) |
| 637 | metricKafkaWriteLatency.Observe(time.Since(startTime).Seconds()) |
| 638 | |
| 639 | partitionLabel := fmt.Sprintf("partition_%d", partitionID) |
| 640 | count := 0 |
| 641 | sizeBytes := 0 |
| 642 | for _, result := range produceResults { |
| 643 | if result.Err != nil { |
| 644 | _ = level.Error(d.logger).Log("msg", "failed to write to kafka", "err", result.Err, "tenant", userID) |
| 645 | } else { |
| 646 | count++ |
| 647 | sizeBytes += len(result.Record.Value) |
| 648 | } |
| 649 | } |
no test coverage detected