MCPcopy
hub / github.com/grafana/tempo / Compact

Method Compact

tempodb/encoding/vparquet3/compactor.go:30–223  ·  view source on GitHub ↗
(ctx context.Context, l log.Logger, r backend.Reader, w backend.Writer, inputs []*backend.BlockMeta)

Source from the content-addressed store, hash-verified

28}
29
30func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, w backend.Writer, inputs []*backend.BlockMeta) (newCompactedBlocks []*backend.BlockMeta, err error) {
31 var (
32 compactionLevel uint32
33 totalRecords int64
34 minBlockStart time.Time
35 maxBlockEnd time.Time
36 bookmarks = make([]*bookmark[parquet.Row], 0, len(inputs))
37 // MaxBytesPerTrace is the largest trace that can be expected, and assumes 1 byte per value on average (same as flushing).
38 // Divide by 4 to presumably require 2 slice allocations if we ever see a trace this large
39 pool = newRowPool(c.opts.MaxBytesPerTrace / 4)
40 )
41 for _, blockMeta := range inputs {
42 totalRecords += blockMeta.TotalObjects
43
44 if blockMeta.CompactionLevel > compactionLevel {
45 compactionLevel = blockMeta.CompactionLevel
46 }
47
48 if blockMeta.StartTime.Before(minBlockStart) || minBlockStart.IsZero() {
49 minBlockStart = blockMeta.StartTime
50 }
51 if blockMeta.EndTime.After(maxBlockEnd) {
52 maxBlockEnd = blockMeta.EndTime
53 }
54
55 block := newBackendBlock(blockMeta, r)
56
57 derivedCtx, span := tracer.Start(ctx, "vparquet.compactor.iterator")
58 defer span.End()
59
60 iter, err := block.rawIter(derivedCtx, pool)
61 if err != nil {
62 return nil, err
63 }
64
65 bookmarks = append(bookmarks, newBookmark[parquet.Row](iter))
66 }
67
68 var (
69 nextCompactionLevel = compactionLevel + 1
70 sch = parquet.SchemaOf(new(Trace))
71 )
72
73 // Dedupe rows and also call the metrics callback.
74 combine := func(rows []parquet.Row) (parquet.Row, error) {
75 if len(rows) == 0 {
76 return nil, nil
77 }
78
79 if len(rows) == 1 {
80 return rows[0], nil
81 }
82
83 isEqual := true
84 for i := 1; i < len(rows) && isEqual; i++ {
85 isEqual = rows[0].Equal(rows[i])
86 }
87 if isEqual {

Callers 3

benchmarkCompactorFunction · 0.95
BenchmarkCompactorDupesFunction · 0.95
TestCompactFunction · 0.95

Calls 15

ConsumeWithFinalMethod · 0.95
ResultMethod · 0.95
appendBlockMethod · 0.95
finishBlockMethod · 0.95
NewUUIDFunction · 0.92
newRowPoolFunction · 0.70
newBackendBlockFunction · 0.70
countSpansFunction · 0.70
NewCombinerFunction · 0.70
newMultiblockIteratorFunction · 0.70
newStreamingBlockFunction · 0.70

Tested by 3

benchmarkCompactorFunction · 0.76
BenchmarkCompactorDupesFunction · 0.76
TestCompactFunction · 0.76