(ctx context.Context, block *streamingBlock, l log.Logger)
| 223 | } |
| 224 | |
| 225 | func (c *Compactor) appendBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 226 | _, span := tracer.Start(ctx, "vparquet.compactor.appendBlock") |
| 227 | defer span.End() |
| 228 | |
| 229 | var ( |
| 230 | objs = block.CurrentBufferedObjects() |
| 231 | vals = block.EstimatedBufferedBytes() |
| 232 | compactionLevel = int(block.meta.CompactionLevel - 1) |
| 233 | ) |
| 234 | |
| 235 | if c.opts.ObjectsWritten != nil { |
| 236 | c.opts.ObjectsWritten(compactionLevel, objs) |
| 237 | } |
| 238 | |
| 239 | bytesFlushed, err := block.Flush() |
| 240 | if err != nil { |
| 241 | return err |
| 242 | } |
| 243 | |
| 244 | if c.opts.BytesWritten != nil { |
| 245 | c.opts.BytesWritten(compactionLevel, bytesFlushed) |
| 246 | } |
| 247 | |
| 248 | level.Info(l).Log("msg", "flushed to block", "bytes", bytesFlushed, "objects", objs, "values", vals) |
| 249 | |
| 250 | return nil |
| 251 | } |
| 252 | |
| 253 | func (c *Compactor) finishBlock(ctx context.Context, block *streamingBlock, l log.Logger) error { |
| 254 | _, span := tracer.Start(ctx, "vparquet.compactor.finishBlock") |
no test coverage detected