(ctx context.Context, block *streamingBlock, l log.Logger)
| 238 | } |
| 239 | |
| 240 | func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 241 | _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") |
| 242 | defer span.End() |
| 243 | |
| 244 | var ( |
| 245 | objs = block.CurrentBufferedObjects() |
| 246 | vals = block.EstimatedBufferedBytes() |
| 247 | compactionLevel = int(block.meta.CompactionLevel - 1) |
| 248 | ) |
| 249 | |
| 250 | if c.opts.ObjectsWritten != nil { |
| 251 | c.opts.ObjectsWritten(compactionLevel, objs) |
| 252 | } |
| 253 | |
| 254 | bytesFlushed, err := block.Flush() |
| 255 | if err != nil { |
| 256 | return err |
| 257 | } |
| 258 | |
| 259 | if c.opts.BytesWritten != nil { |
| 260 | c.opts.BytesWritten(compactionLevel, bytesFlushed) |
| 261 | } |
| 262 | |
| 263 | level.Info(l).Log("msg", "flushed to block", "bytes", bytesFlushed, "objects", objs, "values", vals) |
| 264 | |
| 265 | return nil |
| 266 | } |
| 267 | |
| 268 | func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 269 | _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") |
no test coverage detected