MCPcopy
hub / github.com/grafana/tempo / CompactWithConfig

Method CompactWithConfig

tempodb/compactor.go:237–348  ·  view source on GitHub ↗
(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides)

Source from the content-addressed store, hash-verified

235}
236
237func (rw *readerWriter) CompactWithConfig(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides) ([]*backend.BlockMeta, error) {
238 level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))
239
240 // todo - add timeout?
241 ctx, span := tracer.Start(ctx, "rw.compact")
242 defer span.End()
243
244 traceID, _ := tracing.ExtractTraceID(ctx)
245 if traceID != "" {
246 level.Info(rw.logger).Log("msg", "beginning compaction", "traceID", traceID)
247 }
248
249 if len(blockMetas) == 0 {
250 return nil, nil
251 }
252
253 var err error
254 startTime := time.Now()
255
256 var totalRecords int
257 for _, blockMeta := range blockMetas {
258 level.Info(rw.logger).Log(
259 "msg", "compacting block",
260 "version", blockMeta.Version,
261 "tenantID", blockMeta.TenantID,
262 "blockID", blockMeta.BlockID.String(),
263 "startTime", blockMeta.StartTime.String(),
264 "endTime", blockMeta.EndTime.String(),
265 "totalObjects", blockMeta.TotalObjects,
266 "size", blockMeta.Size_,
267 "compactionLevel", blockMeta.CompactionLevel,
268 "totalRecords", blockMeta.TotalObjects,
269 "bloomShardCount", blockMeta.BloomShardCount,
270 "footerSize", blockMeta.FooterSize,
271 "replicationFactor", blockMeta.ReplicationFactor,
272 )
273 totalRecords += int(blockMeta.TotalObjects)
274
275 // Make sure block still exists
276 _, err = rw.r.BlockMeta(ctx, (uuid.UUID)(blockMeta.BlockID), tenantID)
277 if err != nil {
278 return nil, err
279 }
280 }
281
282 enc, err := encoding.FromVersion(blockMetas[0].Version)
283 if err != nil {
284 return nil, err
285 }
286
287 if !enc.CompactionSupported() {
288 return nil, fmt.Errorf("compaction not supported for block version %s", blockMetas[0].Version)
289 }
290
291 compactionLevel := CompactionLevelForBlocks(blockMetas)
292 compactionLevelLabel := strconv.Itoa(int(compactionLevel))
293
294 opts := common.CompactionOptions{

Callers 1

compactOneJobMethod · 0.95

Calls 15

CompactMethod · 0.95
ExtractTraceIDFunction · 0.92
FromVersionFunction · 0.92
WarnDisconnectedTraceFunction · 0.92
WarnRootlessTraceFunction · 0.92
CompactionLevelForBlocksFunction · 0.85
markCompactedFunction · 0.85
LogMethod · 0.65
StartMethod · 0.65
NowMethod · 0.65
BlockMetaMethod · 0.65
CompactionSupportedMethod · 0.65

Tested by

no test coverage detected