(ctx context.Context, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides)
| 66 | } |
| 67 | |
| 68 | func (rw *readerWriter) retainTenant(ctx context.Context, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides) { |
| 69 | start := time.Now() |
| 70 | defer func() { metricRetentionDuration.Observe(time.Since(start).Seconds()) }() |
| 71 | |
| 72 | // Check for overrides |
| 73 | retention := compactorCfg.BlockRetention // Default |
| 74 | if r := compactorOverrides.BlockRetentionForTenant(tenantID); r != 0 { |
| 75 | retention = r |
| 76 | } |
| 77 | level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention) |
| 78 | |
| 79 | // iterate through block list. make compacted anything that is past retention. |
| 80 | cutoff := time.Now().Add(-retention) |
| 81 | blocklist := rw.blocklist.Metas(tenantID) |
| 82 | for _, b := range blocklist { |
| 83 | select { |
| 84 | case <-ctx.Done(): |
| 85 | return |
| 86 | default: |
| 87 | if b.EndTime.Before(cutoff) && compactorSharder.Owns(b.BlockID.String()) { |
| 88 | level.Info(rw.logger).Log("msg", "marking block for deletion", "blockID", b.BlockID, "tenantID", tenantID) |
| 89 | err := rw.c.MarkBlockCompacted((uuid.UUID)(b.BlockID), tenantID) |
| 90 | if err != nil { |
| 91 | level.Error(rw.logger).Log("msg", "failed to mark block compacted during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err) |
| 92 | metricRetentionErrors.Inc() |
| 93 | } else { |
| 94 | metricMarkedForDeletion.Inc() |
| 95 | |
| 96 | rw.blocklist.Update(tenantID, nil, []*backend.BlockMeta{b}, []*backend.CompactedBlockMeta{ |
| 97 | { |
| 98 | BlockMeta: *b, |
| 99 | CompactedTime: time.Now(), |
| 100 | }, |
| 101 | }, nil) |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | // iterate through compacted list looking for blocks ready to be cleared |
| 108 | cutoff = time.Now().Add(-compactorCfg.CompactedBlockRetention) |
| 109 | compactedBlocklist := rw.blocklist.CompactedMetas(tenantID) |
| 110 | for _, b := range compactedBlocklist { |
| 111 | select { |
| 112 | case <-ctx.Done(): |
| 113 | return |
| 114 | default: |
| 115 | level.Debug(rw.logger).Log("owns", compactorSharder.Owns(b.BlockID.String()), "blockID", b.BlockID, "tenantID", tenantID) |
| 116 | if b.CompactedTime.Before(cutoff) && compactorSharder.Owns(b.BlockID.String()) { |
| 117 | level.Info(rw.logger).Log("msg", "deleting block", "blockID", b.BlockID, "tenantID", tenantID) |
| 118 | err := rw.c.ClearBlock((uuid.UUID)(b.BlockID), tenantID) |
| 119 | if err != nil { |
| 120 | level.Error(rw.logger).Log("msg", "failed to clear compacted block during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err) |
| 121 | metricRetentionErrors.Inc() |
| 122 | } else { |
| 123 | metricDeleted.Inc() |
| 124 | |
| 125 | rw.blocklist.Update(tenantID, nil, nil, nil, []*backend.CompactedBlockMeta{b}) |
no test coverage detected