Observe processes trace batches for usage tracking in the hot path. NOTE: this is performance sensitive code, because it is called on every ingested span. you should consider the performance impact of a change made here.
(tenant string, batches []*v1.ResourceSpans)
| 258 | // NOTE: this is performance sensitive code, because it is called on every ingested span. |
| 259 | // you should consider the performance impact of a change made here. |
| 260 | func (u *Tracker) Observe(tenant string, batches []*v1.ResourceSpans) { |
| 261 | dimensions := u.labelsFn(tenant) |
| 262 | if len(dimensions) == 0 { |
| 263 | // Not configured |
| 264 | // TODO - Should we put it all in the unattributed bucket instead? |
| 265 | return |
| 266 | } |
| 267 | |
| 268 | max := u.maxFn(tenant) |
| 269 | if max == 0 { |
| 270 | max = u.cfg.MaxCardinality |
| 271 | } |
| 272 | |
| 273 | u.mtx.Lock() |
| 274 | defer u.mtx.Unlock() |
| 275 | |
| 276 | var ( |
| 277 | now = time.Now().Unix() |
| 278 | data = u.getTenant(tenant) |
| 279 | mapping, buffer1, buffer2, last = data.GetBuffersForDimensions(dimensions) |
| 280 | ) |
| 281 | |
| 282 | for _, batch := range batches { |
| 283 | unaccountedForBatchData, totalSpanCount := nonSpanDataLength(batch) |
| 284 | |
| 285 | if totalSpanCount == 0 { |
| 286 | // Mainly to prevent a panic below, but is this even possible? |
| 287 | continue |
| 288 | } |
| 289 | |
| 290 | // This is 1/Nth of the unaccounted for batch data that gets added to each span. |
| 291 | // Adding this incrementally as we go through the spans is the fastest method, but |
| 292 | // loses some precision. The other (original) implementation is to record span counts |
| 293 | // per series into a map and reconcile at the end. That method has more accurate data because |
| 294 | // it performs the floating point math once on the total, instead of accumulating 1/N + 1/N ... errors. |
| 295 | batchPortion := int(math.RoundToEven(float64(unaccountedForBatchData) / float64(totalSpanCount))) |
| 296 | |
| 297 | // To account for the accumulated error we dump the remaining delta onto the first span, which can be negative. |
| 298 | // The result ensures the total recorded bytes matches the input. |
| 299 | firstSpanPortion := unaccountedForBatchData - batchPortion*totalSpanCount |
| 300 | |
| 301 | // Reset value buffer for every batch. |
| 302 | for k := range buffer1 { |
| 303 | buffer1[k] = missingLabel |
| 304 | } |
| 305 | |
| 306 | if batch.Resource != nil { |
| 307 | for _, m := range mapping { |
| 308 | // Check ScopeAll first since most users use unscoped attributes (short-circuit optimization) |
| 309 | if m.scope == ScopeAll || m.scope == ScopeResource { |
| 310 | for _, a := range batch.Resource.Attributes { |
| 311 | v := a.Value.GetStringValue() |
| 312 | if v == "" { |
| 313 | continue |
| 314 | } |
| 315 | if a.Key == m.from { |
| 316 | buffer1[m.to] = v |
| 317 | break |