requestsByTraceID groups ResourceSpans by trace ID, producing hash-ring tokens and rebatched traces for downstream write-path processing. It truncates oversized attributes and returns the first truncation example (if any) for diagnostic logging.
(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int)
| 662 | // rebatched traces for downstream write-path processing. It truncates oversized attributes |
| 663 | // and returns the first truncation example (if any) for diagnostic logging. |
| 664 | func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, truncatedAttributesCount, *truncatedAttrInfo, error) { |
| 665 | const tracesPerBatch = 20 // p50 of internal env |
| 666 | tracesByID := make(map[uint64]*rebatchedTrace, tracesPerBatch) |
| 667 | truncatedCount := truncatedAttributesCount{} |
| 668 | |
| 669 | // truncationExample captures one example of a truncated attribute for rate-limited logging. |
| 670 | var truncationExample truncatedAttrInfo |
| 671 | |
| 672 | currentTime := uint32(time.Now().Unix()) |
| 673 | for _, b := range batches { |
| 674 | spansByILS := make(map[uint64]*v1.ScopeSpans) |
| 675 | // check resource for large attributes |
| 676 | if maxSpanAttrSize > 0 && b.Resource != nil { |
| 677 | truncatedCount.Resource += processAttributes(b.Resource.Attributes, maxSpanAttrSize, &truncationExample, "resource") |
| 678 | } |
| 679 | |
| 680 | for _, ils := range b.ScopeSpans { |
| 681 | |
| 682 | // check instrumentation for large attributes |
| 683 | if maxSpanAttrSize > 0 && ils.Scope != nil { |
| 684 | truncatedCount.Scope += processAttributes(ils.Scope.Attributes, maxSpanAttrSize, &truncationExample, "scope") |
| 685 | } |
| 686 | |
| 687 | for _, span := range ils.Spans { |
| 688 | // check spans for large attributes |
| 689 | if maxSpanAttrSize > 0 { |
| 690 | truncatedCount.Span += processAttributes(span.Attributes, maxSpanAttrSize, &truncationExample, "span") |
| 691 | |
| 692 | // check large attributes for events and links |
| 693 | for _, event := range span.Events { |
| 694 | truncatedCount.Event += processAttributes(event.Attributes, maxSpanAttrSize, &truncationExample, "event") |
| 695 | } |
| 696 | |
| 697 | for _, link := range span.Links { |
| 698 | truncatedCount.Link += processAttributes(link.Attributes, maxSpanAttrSize, &truncationExample, "link") |
| 699 | } |
| 700 | } |
| 701 | traceID := span.TraceId |
| 702 | if !validation.ValidTraceID(traceID) { |
| 703 | overrides.RecordDiscardedSpans(spanCount, overrides.ReasonInvalidTraceID, userID) |
| 704 | return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8) |
| 705 | } |
| 706 | |
| 707 | if !validation.ValidSpanID(span.SpanId) { |
| 708 | overrides.RecordDiscardedSpans(spanCount, overrides.ReasonInvalidSpanID, userID) |
| 709 | return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "span ids must be 64 bit and not all zero, received %d bits", len(span.SpanId)*8) |
| 710 | } |
| 711 | |
| 712 | traceKey := util.HashForTraceID(traceID) |
| 713 | ilsKey := traceKey |
| 714 | if ils.Scope != nil { |
| 715 | ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Name) |
| 716 | ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Version) |
| 717 | } |
| 718 | |
| 719 | existingILS, ilsAdded := spansByILS[ilsKey] |
| 720 | if !ilsAdded { |
| 721 | existingILS = &v1.ScopeSpans{ |