(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID, logger log.Logger)
| 101 | } |
| 102 | |
| 103 | func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID, logger log.Logger) (*backend.BlockMeta, error) { |
| 104 | enc, err := encoding.FromVersion(meta.Version) |
| 105 | if err != nil { |
| 106 | return nil, fmt.Errorf("error getting encoder: %w", err) |
| 107 | } |
| 108 | |
| 109 | // todo: provide a way to pass a config in. this just uses defaults which is fine for now |
| 110 | opts := common.CompactionOptions{ |
| 111 | BlockConfig: common.BlockConfig{ |
| 112 | // defaults should be fine for just recreating a few blocks |
| 113 | BloomFP: common.DefaultBloomFP, |
| 114 | BloomShardSizeBytes: common.DefaultBloomShardSizeBytes, |
| 115 | Version: meta.Version, |
| 116 | |
| 117 | // parquet fields |
| 118 | RowGroupSizeBytes: 100_000_000, // default |
| 119 | |
| 120 | // vParquet3 fields |
| 121 | DedicatedColumns: meta.DedicatedColumns, |
| 122 | }, |
| 123 | OutputBlocks: 1, |
| 124 | MaxBytesPerTrace: 0, // disable for this process |
| 125 | |
| 126 | // hook to drop the trace |
| 127 | DropObject: func(id common.ID) bool { |
| 128 | for _, tid := range traceIDs { |
| 129 | if bytes.Equal(id, tid) { |
| 130 | level.Info(logger).Log("msg", "dropping trace", "traceID", util.TraceIDToHexString(id)) |
| 131 | return true |
| 132 | } |
| 133 | } |
| 134 | return false |
| 135 | }, |
| 136 | |
| 137 | // setting to prevent panics. should we track and report these? |
| 138 | BytesWritten: func(_, _ int) {}, |
| 139 | ObjectsCombined: func(_, _ int) {}, |
| 140 | ObjectsWritten: func(_, _ int) {}, |
| 141 | SpansDiscarded: func(_, _, _ string, _ int) {}, |
| 142 | DisconnectedTrace: func() {}, |
| 143 | RootlessTrace: func() {}, |
| 144 | DedupedSpans: func(_, _ int) {}, |
| 145 | } |
| 146 | |
| 147 | compactor := enc.NewCompactor(opts) |
| 148 | |
| 149 | level.Info(logger).Log("msg", "beginning compaction logs") |
| 150 | out, err := compactor.Compact(ctx, logger, r, w, []*backend.BlockMeta{meta}) |
| 151 | level.Info(logger).Log("msg", "ending compaction logs") |
| 152 | if err != nil { |
| 153 | return nil, err |
| 154 | } |
| 155 | |
| 156 | if len(out) == 0 { |
| 157 | return nil, nil |
| 158 | } |
| 159 | |
| 160 | if len(out) != 1 { |
no test coverage detected