MCPcopy
hub / github.com/grafana/tempo / backpressure

Method backpressure

modules/livestore/instance.go:194–241  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

192}
193
194func (i *instance) backpressure(ctx context.Context) bool {
195 span := oteltrace.SpanFromContext(ctx)
196
197 if i.Cfg.MaxLiveTracesBytes > 0 {
198 // Check live traces
199
200 i.liveTracesMtx.Lock()
201 sz := i.liveTraces.Size()
202 i.liveTracesMtx.Unlock()
203
204 if sz >= i.Cfg.MaxLiveTracesBytes {
205 // Live traces exceeds the expected amount of data in per wal flush,
206 // so wait a bit.
207 select {
208 case <-ctx.Done():
209 return false
210 case <-time.After(1 * time.Second):
211 }
212
213 span.AddEvent("backpressure", oteltrace.WithAttributes(
214 attribute.String("reason", reasonWaitingForLiveTraces),
215 ))
216 metricBackPressure.WithLabelValues(reasonWaitingForLiveTraces).Inc()
217 return true
218 }
219 }
220
221 // Check outstanding wal blocks
222 count := len(i.blocks.Load().walBlocks)
223
224 if count > walBackpressureLimit {
225 // There are multiple outstanding WAL blocks that need completion
226 // so wait a bit.
227 select {
228 case <-ctx.Done():
229 return false
230 case <-time.After(1 * time.Second):
231 }
232
233 metricBackPressure.WithLabelValues(reasonWaitingForWAL).Inc()
234 span.AddEvent("backpressure", oteltrace.WithAttributes(
235 attribute.String("reason", reasonWaitingForWAL),
236 ))
237 return true
238 }
239
240 return false
241}
242
243func (i *instance) pushBytes(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest) {
244 if len(req.Traces) != len(req.Ids) {

Callers 2

waitBackpressureMethod · 0.95

Calls 4

SizeMethod · 0.65
DoneMethod · 0.65
IncMethod · 0.65
StringMethod · 0.45

Tested by 1