(inst *instance)
| 213 | } |
| 214 | |
| 215 | func (s *LiveStore) perTenantCleanupLoop(inst *instance) { |
| 216 | // ticker |
| 217 | ticker := time.NewTicker(s.cfg.InstanceCleanupPeriod) |
| 218 | defer ticker.Stop() |
| 219 | |
| 220 | // Reclaim at a fraction of the grace window so blocks are deleted |
| 221 | // soon after expiry, not at the next InstanceCleanupPeriod tick. |
| 222 | reclaimInterval := s.cfg.BlockReclaimGrace / 4 |
| 223 | if reclaimInterval < time.Second { |
| 224 | reclaimInterval = time.Second |
| 225 | } |
| 226 | reclaimTicker := time.NewTicker(reclaimInterval) |
| 227 | defer reclaimTicker.Stop() |
| 228 | |
| 229 | for { |
| 230 | select { |
| 231 | case <-ticker.C: |
| 232 | // dump any blocks that have been flushed for a while |
| 233 | err := inst.deleteOldBlocks() |
| 234 | if err != nil { |
| 235 | level.Error(s.logger).Log("msg", "failed to delete old blocks", "err", err) |
| 236 | } |
| 237 | case <-reclaimTicker.C: |
| 238 | for _, r := range inst.reclaim.reclaim() { |
| 239 | if r.Err != nil { |
| 240 | level.Error(s.logger).Log("msg", "reclaim failed", "tenant", r.Tenant, "block_id", r.BlockID.String(), "block_type", r.BlockType, "err", r.Err) |
| 241 | continue |
| 242 | } |
| 243 | metricBlocksClearedTotal.WithLabelValues(r.BlockType).Inc() |
| 244 | level.Info(s.logger).Log("msg", "reclaimed block", "tenant", r.Tenant, "block_id", r.BlockID.String(), "block_type", r.BlockType) |
| 245 | } |
| 246 | case <-s.ctx.Done(): |
| 247 | return |
| 248 | } |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | func (s *LiveStore) enqueueCompleteOp(tenantID string, blockID uuid.UUID, jitter bool) error { |
| 253 | op := &completeOp{ |
no test coverage detected