(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest)
| 241 | } |
| 242 | |
| 243 | func (i *instance) pushBytes(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest) { |
| 244 | if len(req.Traces) != len(req.Ids) { |
| 245 | level.Error(i.logger).Log("msg", "mismatched traces and ids length", "IDs", len(req.Ids), "traces", len(req.Traces)) |
| 246 | return |
| 247 | } |
| 248 | |
| 249 | // Wait for room in pipeline if needed |
| 250 | i.waitBackpressure(ctx) |
| 251 | |
| 252 | if err := ctx.Err(); err != nil { |
| 253 | level.Error(i.logger).Log("msg", "failed to push bytes to instance", "err", err) |
| 254 | return |
| 255 | } |
| 256 | |
| 257 | // Check tenant limits |
| 258 | maxBytes := i.overrides.MaxBytesPerTrace(i.tenantID) |
| 259 | maxLiveTraces := i.overrides.MaxLocalTracesPerUser(i.tenantID) |
| 260 | |
| 261 | // Reuse a single Trace across iterations to preserve slice capacity. |
| 262 | // The ResourceSpans pointers are handed off to liveTraces, but the |
| 263 | // Trace wrapper and its backing array can be recycled. |
| 264 | trace := &tempopb.Trace{} |
| 265 | |
| 266 | // For each pre-marshalled trace, we need to unmarshal it and push to live traces |
| 267 | for j, traceBytes := range req.Traces { |
| 268 | traceID := req.Ids[j] |
| 269 | // measure received bytes as sum of slice lengths |
| 270 | // type byte is guaranteed to be 1 byte in size |
| 271 | // ref: https://golang.org/ref/spec#Size_and_alignment_guarantees |
| 272 | i.bytesReceivedTotal.WithLabelValues(i.tenantID, traceDataType).Add(float64(len(traceBytes.Slice))) |
| 273 | |
| 274 | // Capture proto size before returning bytes to pool. |
| 275 | traceSz := len(traceBytes.Slice) |
| 276 | |
| 277 | // Clear stale pointers so prior iterations' ResourceSpans can be |
| 278 | // GC'd, then truncate so Unmarshal appends into the existing backing array. |
| 279 | clear(trace.ResourceSpans) |
| 280 | trace.ResourceSpans = trace.ResourceSpans[:0] |
| 281 | if err := trace.Unmarshal(traceBytes.Slice); err != nil { |
| 282 | level.Error(i.logger).Log("msg", "failed to unmarshal trace", "err", err) |
| 283 | continue |
| 284 | } |
| 285 | |
| 286 | // Reuse the byte slice now that we've unmarshalled it |
| 287 | tempopb.ReuseByteSlices([][]byte{traceBytes.Slice}) |
| 288 | |
| 289 | // test max trace size. use trace sizes over liveTraces b/c it tracks large traces across multiple flushes |
| 290 | if maxBytes > 0 { |
| 291 | allowResult := i.traceSizes.Allow(traceID, traceSz, maxBytes) |
| 292 | if !allowResult.IsAllowed { |
| 293 | i.maxTraceLogger.Log("msg", overrides.ErrorPrefixTraceTooLarge, "max", maxBytes, "traceSz", traceSz, "totalSize", allowResult.CurrentTotalSize, "trace", hex.EncodeToString(traceID), "insight", true) |
| 294 | overrides.RecordDiscardedSpans(countSpans(trace), overrides.ReasonTraceTooLarge, i.tenantID) |
| 295 | continue |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | i.liveTracesMtx.Lock() |
| 300 | // Push each batch in the trace to live traces |
nothing calls this directly
no test coverage detected