| 192 | } |
| 193 | |
| 194 | func (i *instance) backpressure(ctx context.Context) bool { |
| 195 | span := oteltrace.SpanFromContext(ctx) |
| 196 | |
| 197 | if i.Cfg.MaxLiveTracesBytes > 0 { |
| 198 | // Check live traces |
| 199 | |
| 200 | i.liveTracesMtx.Lock() |
| 201 | sz := i.liveTraces.Size() |
| 202 | i.liveTracesMtx.Unlock() |
| 203 | |
| 204 | if sz >= i.Cfg.MaxLiveTracesBytes { |
| 205 | // Live traces exceeds the expected amount of data in per wal flush, |
| 206 | // so wait a bit. |
| 207 | select { |
| 208 | case <-ctx.Done(): |
| 209 | return false |
| 210 | case <-time.After(1 * time.Second): |
| 211 | } |
| 212 | |
| 213 | span.AddEvent("backpressure", oteltrace.WithAttributes( |
| 214 | attribute.String("reason", reasonWaitingForLiveTraces), |
| 215 | )) |
| 216 | metricBackPressure.WithLabelValues(reasonWaitingForLiveTraces).Inc() |
| 217 | return true |
| 218 | } |
| 219 | } |
| 220 | |
| 221 | // Check outstanding wal blocks |
| 222 | count := len(i.blocks.Load().walBlocks) |
| 223 | |
| 224 | if count > walBackpressureLimit { |
| 225 | // There are multiple outstanding WAL blocks that need completion |
| 226 | // so wait a bit. |
| 227 | select { |
| 228 | case <-ctx.Done(): |
| 229 | return false |
| 230 | case <-time.After(1 * time.Second): |
| 231 | } |
| 232 | |
| 233 | metricBackPressure.WithLabelValues(reasonWaitingForWAL).Inc() |
| 234 | span.AddEvent("backpressure", oteltrace.WithAttributes( |
| 235 | attribute.String("reason", reasonWaitingForWAL), |
| 236 | )) |
| 237 | return true |
| 238 | } |
| 239 | |
| 240 | return false |
| 241 | } |
| 242 | |
| 243 | func (i *instance) pushBytes(ctx context.Context, ts time.Time, req *tempopb.PushBytesRequest) { |
| 244 | if len(req.Traces) != len(req.Ids) { |