ConsumeTraces implements consumer.Traces
(ctx context.Context, td ptrace.Traces)
| 343 | |
| 344 | // ConsumeTraces implements consumer.Traces |
| 345 | func (r *receiversShim) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { |
| 346 | ctx, span := tracer.Start(ctx, "distributor.ConsumeTraces") |
| 347 | defer span.End() |
| 348 | |
| 349 | tenant, tenantErr := user.ExtractOrgID(ctx) |
| 350 | if tenantErr == nil { |
| 351 | span.SetAttributes(attribute.String("orgID", tenant)) |
| 352 | } |
| 353 | |
| 354 | var err error |
| 355 | |
| 356 | start := time.Now() |
| 357 | _, err = r.pusher.PushTraces(ctx, td) |
| 358 | metricPushDuration.Observe(time.Since(start).Seconds()) |
| 359 | if err != nil { |
| 360 | if tenantErr == nil { |
| 361 | r.logger.Log("msg", "pusher failed to consume trace data", "tenant", tenant, "err", err) |
| 362 | } else { |
| 363 | r.logger.Log("msg", "pusher failed to consume trace data", "err", err) |
| 364 | } |
| 365 | span.SetStatus(otelcodes.Error, err.Error()) |
| 366 | retryInfoEnabled, e := r.pusher.RetryInfoEnabled(ctx) |
| 367 | if e != nil { |
| 368 | return e |
| 369 | } |
| 370 | err = wrapErrorIfRetryable(err, r.retryDelay, retryInfoEnabled) |
| 371 | } |
| 372 | |
| 373 | return err |
| 374 | } |
| 375 | |
| 376 | // GetExtensions implements component.Host |
| 377 | func (r *receiversShim) GetExtensions() map[component.ID]component.Component { |