compactOneTenant runs a compaction cycle every 30s
(ctx context.Context)
| 88 | |
| 89 | // compactOneTenant runs a compaction cycle every 30s |
| 90 | func (rw *readerWriter) compactOneTenant(ctx context.Context) { |
| 91 | // List of all tenants in the block list |
| 92 | // The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index) |
| 93 | tenants := rw.blocklist.Tenants() |
| 94 | if len(tenants) == 0 { |
| 95 | return |
| 96 | } |
| 97 | |
| 98 | // Iterate through tenants each cycle |
| 99 | // Sort tenants for stability (since original map does not guarantee order) |
| 100 | sort.Slice(tenants, func(i, j int) bool { return tenants[i] < tenants[j] }) |
| 101 | rw.compactorTenantOffset = (rw.compactorTenantOffset + 1) % uint(len(tenants)) |
| 102 | |
| 103 | // Select the next tenant to run compaction for |
| 104 | tenantID := tenants[rw.compactorTenantOffset] |
| 105 | |
| 106 | // Skip compaction for tenants which have it disabled. |
| 107 | if rw.compactorOverrides.CompactionDisabledForTenant(tenantID) { |
| 108 | return |
| 109 | } |
| 110 | |
| 111 | // Get the meta file of all non-compacted blocks for the given tenant |
| 112 | blocklist := rw.blocklist.Metas(tenantID) |
| 113 | |
| 114 | window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID) |
| 115 | if window == 0 { |
| 116 | window = rw.compactorCfg.MaxCompactionRange |
| 117 | } |
| 118 | |
| 119 | // Select which blocks to compact. |
| 120 | // |
| 121 | // Blocks are firstly divided by the active compaction window (default: most recent 24h) |
| 122 | // 1. If blocks are inside the active window, they're grouped by compaction level (how many times they've been compacted). |
| 123 | // Favoring lower compaction levels, and compacting blocks only from the same tenant. |
| 124 | // 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level. |
| 125 | // It picks more recent windows first, and compacting blocks only from the same tenant. |
| 126 | blockSelector := blockselector.NewTimeWindowBlockSelector(blocklist, |
| 127 | window, |
| 128 | rw.compactorCfg.MaxCompactionObjects, |
| 129 | rw.compactorCfg.MaxBlockBytes, |
| 130 | blockselector.DefaultMinInputBlocks, |
| 131 | blockselector.DefaultMaxInputBlocks, |
| 132 | blockselector.DefaultMaxCompactionLevel, |
| 133 | ) |
| 134 | |
| 135 | start := time.Now() |
| 136 | |
| 137 | level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset) |
| 138 | for { |
| 139 | // this context is controlled by the service manager. it being cancelled means that the process is shutting down |
| 140 | if ctx.Err() != nil { |
| 141 | level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx)) |
| 142 | return |
| 143 | } |
| 144 | |
| 145 | // Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one |
| 146 | toBeCompacted, hashString := blockSelector.BlocksToCompact() |
| 147 | if len(toBeCompacted) == 0 { |