(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool)
| 354 | } |
| 355 | |
| 356 | func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool) error { |
| 357 | var connected bool |
| 358 | b.buffer, connected = traceToParquetWithMapping(id, trace, b.buffer, b.dedcolsRes, b.dedcolsSpan) |
| 359 | if !connected { |
| 360 | dataquality.WarnDisconnectedTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 361 | } |
| 362 | if b.buffer != nil && b.buffer.RootSpanName == "" { |
| 363 | dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 364 | } |
| 365 | |
| 366 | if adjustIngestionSlack { |
| 367 | start, end = common.AdjustTimeRangeForSlack(b.meta.TenantID, b.ingestionSlack, start, end) |
| 368 | } |
| 369 | |
| 370 | // add to current |
| 371 | _, err := b.writer.Write([]*Trace{b.buffer}) |
| 372 | if err != nil { |
| 373 | return fmt.Errorf("error writing row: %w", err) |
| 374 | } |
| 375 | |
| 376 | b.mtx.Lock() |
| 377 | defer b.mtx.Unlock() |
| 378 | b.meta.ObjectAdded(start, end) |
| 379 | b.ids.Set(id, int64(b.ids.Len())) // Next row number |
| 380 | b.unflushedSize += int64(estimateMarshalledSizeFromTrace(b.buffer)) |
| 381 | |
| 382 | return nil |
| 383 | } |
| 384 | |
| 385 | // TODO: potentially add validation to wal blocks and use in the wal replay code in the ingester. |
| 386 | func (b *walBlock) Validate(context.Context) error { |
no test coverage detected