checkForRateLimits checks if the trace batch size exceeds the ingestion rate limit. it will use the ingestion rate limits based on the ingestion strategy configured. LocalIngestionRateStrategy: the ingestion rate limit is applied as is in each distributor. example: if the ingestion rate limit is 10
(tracesSize, spanCount int, userID string)
| 356 | // example: if the ingestion rate limit is 10MB/s and the burst size is 20MB, and there are 5 healthy distributors, |
| 357 | // then each distributor will allow 2MB/s with a burst of 20MB. |
| 358 | func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID string) error { |
| 359 | now := time.Now() |
| 360 | if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) { |
| 361 | overrides.RecordDiscardedSpans(spanCount, overrides.ReasonRateLimited, userID) |
| 362 | // limit: number of bytes per second allowed for the user, as per ingestion rate strategy |
| 363 | limit := int(d.ingestionRateLimiter.Limit(now, userID)) |
| 364 | burst := d.ingestionRateLimiter.Burst(now, userID) |
| 365 | |
| 366 | // globalLimit will be 0 when using local ingestion rate strategy |
| 367 | var globalLimit int |
| 368 | if d.overrides.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy { |
| 369 | // note: global limit should be calculated using healthy distributors count, |
| 370 | // but we are using it in logs, instance count is good enough. |
| 371 | globalLimit = limit * d.DistributorRing.InstancesCount() |
| 372 | } |
| 373 | |
| 374 | // batch size is too big if it's more than the limit and burst both |
| 375 | if tracesSize > limit && tracesSize > burst { |
| 376 | return status.Errorf(codes.ResourceExhausted, |
| 377 | "%s: batch size (%d bytes) exceeds ingestion limit (local: %d bytes/s, global: %d bytes/s, burst: %d bytes) while adding %d bytes for user %s. consider reducing batch size or increasing rate limit.", |
| 378 | overrides.ErrorPrefixRateLimited, tracesSize, limit, globalLimit, burst, tracesSize, userID) |
| 379 | } |
| 380 | |
| 381 | return status.Errorf(codes.ResourceExhausted, |
| 382 | "%s: ingestion rate limit (local: %d bytes/s, global: %d bytes/s, burst: %d bytes) exceeded while adding %d bytes for user %s. consider increasing the limit or reducing ingestion rate.", |
| 383 | overrides.ErrorPrefixRateLimited, limit, globalLimit, burst, tracesSize, userID) |
| 384 | } |
| 385 | |
| 386 | return nil |
| 387 | } |
| 388 | |
| 389 | func (d *Distributor) extractBasicInfo(ctx context.Context, traces ptrace.Traces) (userID string, spanCount, tracesSize int, err error) { |
| 390 | orgID, e := validation.ExtractValidTenantID(ctx) |