MCPcopy
hub / github.com/grafana/tempo / checkForRateLimits

Method checkForRateLimits

modules/distributor/distributor.go:358–387  ·  view source on GitHub ↗

checkForRateLimits checks if the trace batch size exceeds the ingestion rate limit. it will use the ingestion rate limits based on the ingestion strategy configured. LocalIngestionRateStrategy: the ingestion rate limit is applied as is in each distributor. example: if the ingestion rate limit is 10

(tracesSize, spanCount int, userID string)

Source from the content-addressed store, hash-verified

356// example: if the ingestion rate limit is 10MB/s and the burst size is 20MB, and there are 5 healthy distributors,
357// then each distributor will allow 2MB/s with a burst of 20MB.
358func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID string) error {
359 now := time.Now()
360 if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) {
361 overrides.RecordDiscardedSpans(spanCount, overrides.ReasonRateLimited, userID)
362 // limit: number of bytes per second allowed for the user, as per ingestion rate strategy
363 limit := int(d.ingestionRateLimiter.Limit(now, userID))
364 burst := d.ingestionRateLimiter.Burst(now, userID)
365
366 // globalLimit will be 0 when using local ingestion rate strategy
367 var globalLimit int
368 if d.overrides.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy {
369 // note: global limit should be calculated using healthy distributors count,
370 // but we are using it in logs, instance count is good enough.
371 globalLimit = limit * d.DistributorRing.InstancesCount()
372 }
373
374 // batch size is too big if it's more than the limit and burst both
375 if tracesSize > limit && tracesSize > burst {
376 return status.Errorf(codes.ResourceExhausted,
377 "%s: batch size (%d bytes) exceeds ingestion limit (local: %d bytes/s, global: %d bytes/s, burst: %d bytes) while adding %d bytes for user %s. consider reducing batch size or increasing rate limit.",
378 overrides.ErrorPrefixRateLimited, tracesSize, limit, globalLimit, burst, tracesSize, userID)
379 }
380
381 return status.Errorf(codes.ResourceExhausted,
382 "%s: ingestion rate limit (local: %d bytes/s, global: %d bytes/s, burst: %d bytes) exceeded while adding %d bytes for user %s. consider increasing the limit or reducing ingestion rate.",
383 overrides.ErrorPrefixRateLimited, limit, globalLimit, burst, tracesSize, userID)
384 }
385
386 return nil
387}
388
389func (d *Distributor) extractBasicInfo(ctx context.Context, traces ptrace.Traces) (userID string, spanCount, tracesSize int, err error) {
390 orgID, e := validation.ExtractValidTenantID(ctx)

Callers 2

PushTracesMethod · 0.95
TestCheckForRateLimitsFunction · 0.80

Calls 5

RecordDiscardedSpansFunction · 0.92
NowMethod · 0.65
LimitMethod · 0.65
IngestionRateStrategyMethod · 0.65
BurstMethod · 0.45

Tested by 1

TestCheckForRateLimitsFunction · 0.64