| 266 | } |
| 267 | |
| 268 | func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 269 | _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") |
| 270 | defer span.End() |
| 271 | |
| 272 | bytesFlushed, err := block.Complete() |
| 273 | if err != nil { |
| 274 | return fmt.Errorf("error completing block: %w", err) |
| 275 | } |
| 276 | |
| 277 | level.Info(l).Log("msg", "wrote compacted block", |
| 278 | "version", block.meta.Version, |
| 279 | "tenantID", block.meta.TenantID, |
| 280 | "blockID", block.meta.BlockID.String(), |
| 281 | "startTime", block.meta.StartTime.String(), |
| 282 | "endTime", block.meta.EndTime.String(), |
| 283 | "totalObjects", block.meta.TotalObjects, |
| 284 | "size", block.meta.Size_, |
| 285 | "compactionLevel", block.meta.CompactionLevel, |
| 286 | "totalRecords", block.meta.TotalObjects, |
| 287 | "bloomShardCount", block.meta.BloomShardCount, |
| 288 | "footerSize", block.meta.FooterSize, |
| 289 | "replicationFactor", block.meta.ReplicationFactor, |
| 290 | "dedicatedColumns", fmt.Sprintf("%+v", block.meta.DedicatedColumns), |
| 291 | ) |
| 292 | |
| 293 | span.AddEvent("wrote compacted block") |
| 294 | span.SetAttributes( |
| 295 | attribute.String("blockID", block.meta.BlockID.String()), |
| 296 | ) |
| 297 | |
| 298 | compactionLevel := int(block.meta.CompactionLevel) - 1 |
| 299 | if c.opts.BytesWritten != nil { |
| 300 | c.opts.BytesWritten(compactionLevel, bytesFlushed) |
| 301 | } |
| 302 | return nil |
| 303 | } |
| 304 | |
| 305 | type rowPool struct { |
| 306 | pool sync.Pool |