(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool)
| 366 | } |
| 367 | |
| 368 | func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool) error { |
| 369 | var connected bool |
| 370 | b.buffer, connected = traceToParquetWithMapping(id, trace, b.buffer, b.dedcolsRes, b.dedcolsSpan, b.dedcolsEvent) |
| 371 | if !connected { |
| 372 | dataquality.WarnDisconnectedTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 373 | } |
| 374 | if b.buffer != nil && b.buffer.RootSpanName == "" { |
| 375 | dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 376 | } |
| 377 | |
| 378 | if adjustIngestionSlack { |
| 379 | start, end = common.AdjustTimeRangeForSlack(b.meta.TenantID, b.ingestionSlack, start, end) |
| 380 | } |
| 381 | |
| 382 | // add to current |
| 383 | _, err := b.writer.Write([]*Trace{b.buffer}) |
| 384 | if err != nil { |
| 385 | return fmt.Errorf("error writing row: %w", err) |
| 386 | } |
| 387 | |
| 388 | b.mtx.Lock() |
| 389 | defer b.mtx.Unlock() |
| 390 | b.meta.ObjectAdded(start, end) |
| 391 | b.ids.Set(id, int64(b.ids.Len())) // Next row number |
| 392 | b.unflushedSize += int64(estimateMarshalledSizeFromTrace(b.buffer)) |
| 393 | |
| 394 | return nil |
| 395 | } |
| 396 | |
| 397 | // TODO: potentially add validation to wal blocks and use in the wal replay code in the ingester. |
| 398 | func (b *walBlock) Validate(context.Context) error { |
no test coverage detected