| 83 | } |
| 84 | |
| 85 | func (p *writer) flush(ctx context.Context, r tempodb.Reader, w tempodb.Writer, c tempodb.Compactor) error { |
| 86 | // TODO - Retry with backoff? |
| 87 | ctx, span := tracer.Start(ctx, "writer.flush", trace.WithAttributes(attribute.Int("partition", int(p.partition)), attribute.String("section_start_time", p.startTime.String()))) |
| 88 | defer span.End() |
| 89 | |
| 90 | // Flush tenants concurrently |
| 91 | g, ctx := errgroup.WithContext(ctx) |
| 92 | g.SetLimit(flushConcurrency) |
| 93 | |
| 94 | for _, i := range p.m { |
| 95 | g.Go(func() error { |
| 96 | i := i |
| 97 | st := time.Now() |
| 98 | |
| 99 | level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID) |
| 100 | err := i.Flush(ctx, r, w, c) |
| 101 | if err != nil { |
| 102 | return err |
| 103 | } |
| 104 | level.Info(p.logger).Log("msg", "flushed tenant", "tenant", i.tenantID, "elapsed", time.Since(st)) |
| 105 | return nil |
| 106 | }) |
| 107 | } |
| 108 | return g.Wait() |
| 109 | } |
| 110 | |
| 111 | func (p *writer) allowCompaction(ctx context.Context, w tempodb.Writer) { |
| 112 | ctx, span := tracer.Start( |