| 1862 | } |
| 1863 | |
| 1864 | func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) { |
| 1865 | for _, ts := range in { |
| 1866 | // Convert proto labels to traceql labels while stripping the bucket label |
| 1867 | var bucket Static |
| 1868 | h.labelBuffer, bucket = convertProtoLabelsToTraceQL(ts.Labels, true) |
| 1869 | |
| 1870 | if bucket.Type == TypeNil { |
| 1871 | // Bad __bucket label? |
| 1872 | continue |
| 1873 | } |
| 1874 | |
| 1875 | withoutBucketStr := h.labelBuffer.String() |
| 1876 | |
| 1877 | existing, ok := h.ss[withoutBucketStr] |
| 1878 | if !ok { |
| 1879 | // Create a copy of the labels since the buffer will be reused |
| 1880 | withoutBucket := make(Labels, len(h.labelBuffer)) |
| 1881 | copy(withoutBucket, h.labelBuffer) |
| 1882 | |
| 1883 | // Create a fresh histogram slice for each series to avoid sharing memory |
| 1884 | // Pool reuse was causing data contamination between series |
| 1885 | histSlice := make([]Histogram, h.intervalMapper.IntervalCount()) |
| 1886 | |
| 1887 | existing = histSeries{ |
| 1888 | labels: withoutBucket, |
| 1889 | hist: histSlice, |
| 1890 | // Pre-allocate exemplars slice with capacity hint to reduce allocations |
| 1891 | exemplars: make([]Exemplar, 0, len(ts.Exemplars)), |
| 1892 | exemplarBuckets: h.exemplarBucketsCreator(), |
| 1893 | } |
| 1894 | } |
| 1895 | |
| 1896 | b := bucket.Float() |
| 1897 | |
| 1898 | for _, sample := range ts.Samples { |
| 1899 | if sample.Value == 0 { |
| 1900 | continue |
| 1901 | } |
| 1902 | j := h.intervalMapper.IntervalMs(sample.TimestampMs) |
| 1903 | if j >= 0 && j < len(existing.hist) { |
| 1904 | existing.hist[j].Record(b, int(sample.Value)) |
| 1905 | } |
| 1906 | } |
| 1907 | |
| 1908 | // Collect exemplars per series, not globally |
| 1909 | for _, exemplar := range ts.Exemplars { |
| 1910 | if existing.exemplarBuckets.testTotal() { |
| 1911 | break |
| 1912 | } |
| 1913 | if existing.exemplarBuckets.addAndTest(uint64(exemplar.TimestampMs)) { |
| 1914 | continue // Skip this exemplar and continue, next exemplar might fit in a different bucket |
| 1915 | } |
| 1916 | |
| 1917 | // Convert exemplar labels directly (no need for buffer reuse here) |
| 1918 | labels, _ := convertProtoLabelsToTraceQL(exemplar.Labels, false) |
| 1919 | |
| 1920 | existing.exemplars = append(existing.exemplars, Exemplar{ |
| 1921 | Labels: labels, |