| 251 | } |
| 252 | |
| 253 | func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 254 | _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") |
| 255 | defer span.End() |
| 256 | |
| 257 | bytesFlushed, err := block.Complete() |
| 258 | if err != nil { |
| 259 | return fmt.Errorf("error completing block: %w", err) |
| 260 | } |
| 261 | |
| 262 | level.Info(l).Log("msg", "wrote compacted block", |
| 263 | "version", block.meta.Version, |
| 264 | "tenantID", block.meta.TenantID, |
| 265 | "blockID", block.meta.BlockID.String(), |
| 266 | "startTime", block.meta.StartTime.String(), |
| 267 | "endTime", block.meta.EndTime.String(), |
| 268 | "totalObjects", block.meta.TotalObjects, |
| 269 | "size", block.meta.Size_, |
| 270 | "compactionLevel", block.meta.CompactionLevel, |
| 271 | "totalRecords", block.meta.TotalObjects, |
| 272 | "bloomShardCount", block.meta.BloomShardCount, |
| 273 | "footerSize", block.meta.FooterSize, |
| 274 | "replicationFactor", block.meta.ReplicationFactor, |
| 275 | "dedicatedColumns", fmt.Sprintf("%+v", block.meta.DedicatedColumns), |
| 276 | ) |
| 277 | |
| 278 | compactionLevel := int(block.meta.CompactionLevel) - 1 |
| 279 | if c.opts.BytesWritten != nil { |
| 280 | c.opts.BytesWritten(compactionLevel, bytesFlushed) |
| 281 | } |
| 282 | return nil |
| 283 | } |
| 284 | |
| 285 | type rowPool struct { |
| 286 | pool sync.Pool |