(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides)
| 235 | } |
| 236 | |
| 237 | func (rw *readerWriter) CompactWithConfig(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides) ([]*backend.BlockMeta, error) { |
| 238 | level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas)) |
| 239 | |
| 240 | // todo - add timeout? |
| 241 | ctx, span := tracer.Start(ctx, "rw.compact") |
| 242 | defer span.End() |
| 243 | |
| 244 | traceID, _ := tracing.ExtractTraceID(ctx) |
| 245 | if traceID != "" { |
| 246 | level.Info(rw.logger).Log("msg", "beginning compaction", "traceID", traceID) |
| 247 | } |
| 248 | |
| 249 | if len(blockMetas) == 0 { |
| 250 | return nil, nil |
| 251 | } |
| 252 | |
| 253 | var err error |
| 254 | startTime := time.Now() |
| 255 | |
| 256 | var totalRecords int |
| 257 | for _, blockMeta := range blockMetas { |
| 258 | level.Info(rw.logger).Log( |
| 259 | "msg", "compacting block", |
| 260 | "version", blockMeta.Version, |
| 261 | "tenantID", blockMeta.TenantID, |
| 262 | "blockID", blockMeta.BlockID.String(), |
| 263 | "startTime", blockMeta.StartTime.String(), |
| 264 | "endTime", blockMeta.EndTime.String(), |
| 265 | "totalObjects", blockMeta.TotalObjects, |
| 266 | "size", blockMeta.Size_, |
| 267 | "compactionLevel", blockMeta.CompactionLevel, |
| 268 | "totalRecords", blockMeta.TotalObjects, |
| 269 | "bloomShardCount", blockMeta.BloomShardCount, |
| 270 | "footerSize", blockMeta.FooterSize, |
| 271 | "replicationFactor", blockMeta.ReplicationFactor, |
| 272 | ) |
| 273 | totalRecords += int(blockMeta.TotalObjects) |
| 274 | |
| 275 | // Make sure block still exists |
| 276 | _, err = rw.r.BlockMeta(ctx, (uuid.UUID)(blockMeta.BlockID), tenantID) |
| 277 | if err != nil { |
| 278 | return nil, err |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | enc, err := encoding.FromVersion(blockMetas[0].Version) |
| 283 | if err != nil { |
| 284 | return nil, err |
| 285 | } |
| 286 | |
| 287 | if !enc.CompactionSupported() { |
| 288 | return nil, fmt.Errorf("compaction not supported for block version %s", blockMetas[0].Version) |
| 289 | } |
| 290 | |
| 291 | compactionLevel := CompactionLevelForBlocks(blockMetas) |
| 292 | compactionLevelLabel := strconv.Itoa(int(compactionLevel)) |
| 293 | |
| 294 | opts := common.CompactionOptions{ |
no test coverage detected