| 140 | } |
| 141 | |
| 142 | func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, r backend.Reader, to backend.Writer, createBufferedWriter func(w io.Writer) tempo_io.BufferedWriteFlusher) (*streamingBlock, *backend.BlockMeta) { |
| 143 | newMeta := backend.NewBlockMeta(meta.TenantID, (uuid.UUID)(meta.BlockID), VersionString) |
| 144 | newMeta.StartTime = meta.StartTime |
| 145 | newMeta.EndTime = meta.EndTime |
| 146 | newMeta.ReplicationFactor = meta.ReplicationFactor |
| 147 | newMeta.DedicatedColumns = filterDedicatedColumns(meta.DedicatedColumns) |
| 148 | |
| 149 | // TotalObjects is used here an an estimated count for the bloom filter. |
| 150 | // The real number of objects is tracked below. |
| 151 | bloom := common.NewBloom(cfg.BloomFP, uint(cfg.BloomShardSizeBytes), uint(meta.TotalObjects)) |
| 152 | |
| 153 | var ( |
| 154 | w = &backendWriter{ctx, to, DataFileName, (uuid.UUID)(meta.BlockID), meta.TenantID, nil} |
| 155 | bw = createBufferedWriter(w) |
| 156 | _, writerOptions, _ = SchemaWithDynamicChanges(meta.DedicatedColumns) |
| 157 | pw = parquet.NewGenericWriter[*Trace](bw, writerOptions...) |
| 158 | ) |
| 159 | |
| 160 | return &streamingBlock{ |
| 161 | ctx: ctx, |
| 162 | meta: newMeta, |
| 163 | bloom: bloom, |
| 164 | bw: bw, |
| 165 | pw: pw, |
| 166 | w: w, |
| 167 | r: r, |
| 168 | to: to, |
| 169 | index: &index{}, |
| 170 | withNoCompactFlag: cfg.CreateWithNoCompactFlag, |
| 171 | }, newMeta |
| 172 | } |
| 173 | |
| 174 | func (b *streamingBlock) Add(tr *Trace, start, end uint32) error { |
| 175 | _, err := b.pw.Write([]*Trace{tr}) |