MCPcopy
hub / github.com/grafana/tempo / pushBytes

Method pushBytes

modules/livestore/instance.go:243–324  ·  view source on GitHub ↗
(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest)

Source from the content-addressed store, hash-verified

241}
242
243func (i *instance) pushBytes(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest) {
244 if len(req.Traces) != len(req.Ids) {
245 level.Error(i.logger).Log("msg", "mismatched traces and ids length", "IDs", len(req.Ids), "traces", len(req.Traces))
246 return
247 }
248
249 // Wait for room in pipeline if needed
250 i.waitBackpressure(ctx)
251
252 if err := ctx.Err(); err != nil {
253 level.Error(i.logger).Log("msg", "failed to push bytes to instance", "err", err)
254 return
255 }
256
257 // Check tenant limits
258 maxBytes := i.overrides.MaxBytesPerTrace(i.tenantID)
259 maxLiveTraces := i.overrides.MaxLocalTracesPerUser(i.tenantID)
260
261 // Reuse a single Trace across iterations to preserve slice capacity.
262 // The ResourceSpans pointers are handed off to liveTraces, but the
263 // Trace wrapper and its backing array can be recycled.
264 trace := &tempopb.Trace{}
265
266 // For each pre-marshalled trace, we need to unmarshal it and push to live traces
267 for j, traceBytes := range req.Traces {
268 traceID := req.Ids[j]
269 // measure received bytes as sum of slice lengths
270 // type byte is guaranteed to be 1 byte in size
271 // ref: https://golang.org/ref/spec#Size_and_alignment_guarantees
272 i.bytesReceivedTotal.WithLabelValues(i.tenantID, traceDataType).Add(float64(len(traceBytes.Slice)))
273
274 // Capture proto size before returning bytes to pool.
275 traceSz := len(traceBytes.Slice)
276
277 // Clear stale pointers so prior iterations' ResourceSpans can be
278 // GC'd, then truncate so Unmarshal appends into the existing backing array.
279 clear(trace.ResourceSpans)
280 trace.ResourceSpans = trace.ResourceSpans[:0]
281 if err := trace.Unmarshal(traceBytes.Slice); err != nil {
282 level.Error(i.logger).Log("msg", "failed to unmarshal trace", "err", err)
283 continue
284 }
285
286 // Reuse the byte slice now that we've unmarshalled it
287 tempopb.ReuseByteSlices([][]byte{traceBytes.Slice})
288
289 // test max trace size. use trace sizes over liveTraces b/c it tracks large traces across multiple flushes
290 if maxBytes > 0 {
291 allowResult := i.traceSizes.Allow(traceID, traceSz, maxBytes)
292 if !allowResult.IsAllowed {
293 i.maxTraceLogger.Log("msg", overrides.ErrorPrefixTraceTooLarge, "max", maxBytes, "traceSz", traceSz, "totalSize", allowResult.CurrentTotalSize, "trace", hex.EncodeToString(traceID), "insight", true)
294 overrides.RecordDiscardedSpans(countSpans(trace), overrides.ReasonTraceTooLarge, i.tenantID)
295 continue
296 }
297 }
298
299 i.liveTracesMtx.Lock()
300 // Push each batch in the trace to live traces

Callers

nothing calls this directly

Calls 13

waitBackpressureMethod · 0.95
UnmarshalMethod · 0.95
ReuseByteSlicesFunction · 0.92
RecordDiscardedSpansFunction · 0.92
AllowMethod · 0.80
EncodeToStringMethod · 0.80
countSpansFunction · 0.70
LogMethod · 0.65
ErrorMethod · 0.65
MaxBytesPerTraceMethod · 0.65
MaxLocalTracesPerUserMethod · 0.65

Tested by

no test coverage detected