(ctx context.Context)
| 79 | } |
| 80 | |
| 81 | func (i *liveTracesIter) iter(ctx context.Context) { |
| 82 | i.mtx.Lock() |
| 83 | defer i.mtx.Unlock() |
| 84 | defer close(i.ch) |
| 85 | |
| 86 | // Get the list of all traces sorted by ID |
| 87 | entries := make([]entry, 0, len(i.liveTraces.Traces)) |
| 88 | for hash, t := range i.liveTraces.Traces { |
| 89 | entries = append(entries, entry{t.ID, hash}) |
| 90 | } |
| 91 | slices.SortFunc(entries, func(a, b entry) int { |
| 92 | return bytes.Compare(a.id, b.id) |
| 93 | }) |
| 94 | |
| 95 | // h and buffer are reused across all spans to avoid repeated allocations. |
| 96 | h := util.NewTokenHasher() |
| 97 | buffer := make([]byte, 4) |
| 98 | // seen is reused across traces to avoid repeated allocations. |
| 99 | seen := make(map[uint64]struct{}, 1024) |
| 100 | |
| 101 | // Begin sending to channel in chunks to reduce channel overhead. |
| 102 | seq := slices.Chunk(entries, 10) |
| 103 | for entries := range seq { |
| 104 | output := make([]chEntry, 0, len(entries)) |
| 105 | |
| 106 | for _, e := range entries { |
| 107 | |
| 108 | entry := i.liveTraces.Traces[e.hash] |
| 109 | |
| 110 | tr := new(tempopb.Trace) |
| 111 | |
| 112 | for _, b := range entry.Batches { |
| 113 | // This unmarshal appends the batches onto the existing tempopb.Trace |
| 114 | // so we don't need to allocate another container temporarily |
| 115 | err := tr.Unmarshal(b) |
| 116 | if err != nil { |
| 117 | i.ch <- []chEntry{{err: err}} |
| 118 | return |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | // Deduplicate spans and update block timestamp bounds in one pass. |
| 123 | for _, rs := range tr.ResourceSpans { |
| 124 | for _, ss := range rs.ScopeSpans { |
| 125 | unique := ss.Spans[:0] |
| 126 | for _, s := range ss.Spans { |
| 127 | token := util.TokenForID(h, buffer, int32(s.Kind), s.SpanId) |
| 128 | if _, ok := seen[token]; ok { |
| 129 | i.dedupedSpans++ |
| 130 | continue |
| 131 | } |
| 132 | seen[token] = struct{}{} |
| 133 | unique = append(unique, s) |
| 134 | if i.start == 0 || s.StartTimeUnixNano < i.start { |
| 135 | i.start = s.StartTimeUnixNano |
| 136 | } |
| 137 | if s.EndTimeUnixNano > i.end { |
| 138 | i.end = s.EndTimeUnixNano |
no test coverage detected