MCPcopy
hub / github.com/grafana/tempo / compactOneTenant

Method compactOneTenant

tempodb/compactor.go:90–180  ·  view source on GitHub ↗

compactOneTenant runs a compaction cycle every 30s

(ctx context.Context)

Source from the content-addressed store, hash-verified

88
89// compactOneTenant runs a compaction cycle every 30s
90func (rw *readerWriter) compactOneTenant(ctx context.Context) {
91 // List of all tenants in the block list
92 // The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index)
93 tenants := rw.blocklist.Tenants()
94 if len(tenants) == 0 {
95 return
96 }
97
98 // Iterate through tenants each cycle
99 // Sort tenants for stability (since original map does not guarantee order)
100 sort.Slice(tenants, func(i, j int) bool { return tenants[i] < tenants[j] })
101 rw.compactorTenantOffset = (rw.compactorTenantOffset + 1) % uint(len(tenants))
102
103 // Select the next tenant to run compaction for
104 tenantID := tenants[rw.compactorTenantOffset]
105
106 // Skip compaction for tenants which have it disabled.
107 if rw.compactorOverrides.CompactionDisabledForTenant(tenantID) {
108 return
109 }
110
111 // Get the meta file of all non-compacted blocks for the given tenant
112 blocklist := rw.blocklist.Metas(tenantID)
113
114 window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID)
115 if window == 0 {
116 window = rw.compactorCfg.MaxCompactionRange
117 }
118
119 // Select which blocks to compact.
120 //
121 // Blocks are firstly divided by the active compaction window (default: most recent 24h)
122 // 1. If blocks are inside the active window, they're grouped by compaction level (how many times they've been compacted).
123 // Favoring lower compaction levels, and compacting blocks only from the same tenant.
124 // 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level.
125 // It picks more recent windows first, and compacting blocks only from the same tenant.
126 blockSelector := blockselector.NewTimeWindowBlockSelector(blocklist,
127 window,
128 rw.compactorCfg.MaxCompactionObjects,
129 rw.compactorCfg.MaxBlockBytes,
130 blockselector.DefaultMinInputBlocks,
131 blockselector.DefaultMaxInputBlocks,
132 blockselector.DefaultMaxCompactionLevel,
133 )
134
135 start := time.Now()
136
137 level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
138 for {
139 // this context is controlled by the service manager. it being cancelled means that the process is shutting down
140 if ctx.Err() != nil {
141 level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx))
142 return
143 }
144
145 // Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
146 toBeCompacted, hashString := blockSelector.BlocksToCompact()
147 if len(toBeCompacted) == 0 {

Callers 2

compactionLoopMethod · 0.95

Calls 14

compactWhileOwnsMethod · 0.95
MeasureOutstandingBlocksFunction · 0.85
TenantsMethod · 0.65
NowMethod · 0.65
LogMethod · 0.65
BlocksToCompactMethod · 0.65
OwnsMethod · 0.65
ErrorMethod · 0.65
IncMethod · 0.65

Tested by 1