| 46 | } |
| 47 | |
| 48 | func (p *RetentionProvider) Start(ctx context.Context) <-chan *work.Job { |
| 49 | jobs := make(chan *work.Job, 1) |
| 50 | |
| 51 | go func() { |
| 52 | defer close(jobs) |
| 53 | ticker := time.NewTicker(p.cfg.Interval) |
| 54 | defer ticker.Stop() |
| 55 | |
| 56 | level.Info(p.logger).Log("msg", "retention provider started") |
| 57 | |
| 58 | for { |
| 59 | select { |
| 60 | case <-ctx.Done(): |
| 61 | level.Info(p.logger).Log("msg", "retention provider stopping") |
| 62 | return |
| 63 | case <-ticker.C: |
| 64 | p.emitRetentionJobs(ctx, jobs) |
| 65 | } |
| 66 | } |
| 67 | }() |
| 68 | |
| 69 | return jobs |
| 70 | } |
| 71 | |
| 72 | // emitRetentionJobs sends one retention job per eligible tenant into jobs. |
| 73 | // It uses a blocking send per job so all tenants are served within one tick |