MCPcopy
hub / github.com/grafana/tempo / requestsByTraceID

Function requestsByTraceID

modules/distributor/distributor.go:664–786  ·  view source on GitHub ↗

requestsByTraceID groups ResourceSpans by trace ID, producing hash-ring tokens and rebatched traces for downstream write-path processing. It truncates oversized attributes and returns the first truncation example (if any) for diagnostic logging.

(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int)

Source from the content-addressed store, hash-verified

662// rebatched traces for downstream write-path processing. It truncates oversized attributes
663// and returns the first truncation example (if any) for diagnostic logging.
664func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, truncatedAttributesCount, *truncatedAttrInfo, error) {
665 const tracesPerBatch = 20 // p50 of internal env
666 tracesByID := make(map[uint64]*rebatchedTrace, tracesPerBatch)
667 truncatedCount := truncatedAttributesCount{}
668
669 // truncationExample captures one example of a truncated attribute for rate-limited logging.
670 var truncationExample truncatedAttrInfo
671
672 currentTime := uint32(time.Now().Unix())
673 for _, b := range batches {
674 spansByILS := make(map[uint64]*v1.ScopeSpans)
675 // check resource for large attributes
676 if maxSpanAttrSize > 0 && b.Resource != nil {
677 truncatedCount.Resource += processAttributes(b.Resource.Attributes, maxSpanAttrSize, &truncationExample, "resource")
678 }
679
680 for _, ils := range b.ScopeSpans {
681
682 // check instrumentation for large attributes
683 if maxSpanAttrSize > 0 && ils.Scope != nil {
684 truncatedCount.Scope += processAttributes(ils.Scope.Attributes, maxSpanAttrSize, &truncationExample, "scope")
685 }
686
687 for _, span := range ils.Spans {
688 // check spans for large attributes
689 if maxSpanAttrSize > 0 {
690 truncatedCount.Span += processAttributes(span.Attributes, maxSpanAttrSize, &truncationExample, "span")
691
692 // check large attributes for events and links
693 for _, event := range span.Events {
694 truncatedCount.Event += processAttributes(event.Attributes, maxSpanAttrSize, &truncationExample, "event")
695 }
696
697 for _, link := range span.Links {
698 truncatedCount.Link += processAttributes(link.Attributes, maxSpanAttrSize, &truncationExample, "link")
699 }
700 }
701 traceID := span.TraceId
702 if !validation.ValidTraceID(traceID) {
703 overrides.RecordDiscardedSpans(spanCount, overrides.ReasonInvalidTraceID, userID)
704 return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
705 }
706
707 if !validation.ValidSpanID(span.SpanId) {
708 overrides.RecordDiscardedSpans(spanCount, overrides.ReasonInvalidSpanID, userID)
709 return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "span ids must be 64 bit and not all zero, received %d bits", len(span.SpanId)*8)
710 }
711
712 traceKey := util.HashForTraceID(traceID)
713 ilsKey := traceKey
714 if ils.Scope != nil {
715 ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Name)
716 ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Version)
717 }
718
719 existingILS, ilsAdded := spansByILS[ilsKey]
720 if !ilsAdded {
721 existingILS = &v1.ScopeSpans{

Callers 8

TestRequestsByTraceIDFunction · 0.85
TestProcessAttributesFunction · 0.85
TestForwarderFunction · 0.85
TestForwarder_shutdownFunction · 0.85
PushTracesMethod · 0.85

Calls 9

ValidTraceIDFunction · 0.92
RecordDiscardedSpansFunction · 0.92
ValidSpanIDFunction · 0.92
HashForTraceIDFunction · 0.92
TokenForFunction · 0.92
processAttributesFunction · 0.85
startEndFromSpanFunction · 0.85
NowMethod · 0.65
ObserveMethod · 0.65

Tested by 7

TestRequestsByTraceIDFunction · 0.68
TestProcessAttributesFunction · 0.68
TestForwarderFunction · 0.68
TestForwarder_shutdownFunction · 0.68