MCPcopy
hub / github.com/grafana/tempo / Observe

Method Observe

modules/distributor/usage/tracker.go:260–372  ·  view source on GitHub ↗

Observe processes trace batches for usage tracking in the hot path. NOTE: this is performance sensitive code, because it is called on every ingested span. you should consider the performance impact of a change made here.

(tenant string, batches []*v1.ResourceSpans)

Source from the content-addressed store, hash-verified

258// NOTE: this is performance sensitive code, because it is called on every ingested span.
259// you should consider the performance impact of a change made here.
260func (u *Tracker) Observe(tenant string, batches []*v1.ResourceSpans) {
261 dimensions := u.labelsFn(tenant)
262 if len(dimensions) == 0 {
263 // Not configured
264 // TODO - Should we put it all in the unattributed bucket instead?
265 return
266 }
267
268 max := u.maxFn(tenant)
269 if max == 0 {
270 max = u.cfg.MaxCardinality
271 }
272
273 u.mtx.Lock()
274 defer u.mtx.Unlock()
275
276 var (
277 now = time.Now().Unix()
278 data = u.getTenant(tenant)
279 mapping, buffer1, buffer2, last = data.GetBuffersForDimensions(dimensions)
280 )
281
282 for _, batch := range batches {
283 unaccountedForBatchData, totalSpanCount := nonSpanDataLength(batch)
284
285 if totalSpanCount == 0 {
286 // Mainly to prevent a panic below, but is this even possible?
287 continue
288 }
289
290 // This is 1/Nth of the unaccounted for batch data that gets added to each span.
291 // Adding this incrementally as we go through the spans is the fastest method, but
292 // loses some precision. The other (original) implementation is to record span counts
293 // per series into a map and reconcile at the end. That method has more accurate data because
294 // it performs the floating point math once on the total, instead of accumulating 1/N + 1/N ... errors.
295 batchPortion := int(math.RoundToEven(float64(unaccountedForBatchData) / float64(totalSpanCount)))
296
297 // To account for the accumulated error we dump the remaining delta onto the first span, which can be negative.
298 // The result ensures the total recorded bytes matches the input.
299 firstSpanPortion := unaccountedForBatchData - batchPortion*totalSpanCount
300
301 // Reset value buffer for every batch.
302 for k := range buffer1 {
303 buffer1[k] = missingLabel
304 }
305
306 if batch.Resource != nil {
307 for _, m := range mapping {
308 // Check ScopeAll first since most users use unscoped attributes (short-circuit optimization)
309 if m.scope == ScopeAll || m.scope == ScopeResource {
310 for _, a := range batch.Resource.Attributes {
311 v := a.Value.GetStringValue()
312 if v == "" {
313 continue
314 }
315 if a.Key == m.from {
316 buffer1[m.to] = v
317 break

Callers 5

TestUsageTrackerFunction · 0.95
TestCollectDoesNotPanicFunction · 0.95

Calls 10

getTenantMethod · 0.95
IncMethod · 0.95
nonSpanDataLengthFunction · 0.85
protoLengthMathFunction · 0.85
GetStringValueMethod · 0.80
NowMethod · 0.65
SizeMethod · 0.65
EqualMethod · 0.45
getSeriesMethod · 0.45

Tested by 5

TestUsageTrackerFunction · 0.76
TestCollectDoesNotPanicFunction · 0.76