(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool)
| 340 | } |
| 341 | |
| 342 | func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool) error { |
| 343 | var connected bool |
| 344 | b.buffer, connected = traceToParquet(b.meta, id, trace, b.buffer) |
| 345 | if !connected { |
| 346 | dataquality.WarnDisconnectedTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 347 | } |
| 348 | if b.buffer != nil && b.buffer.RootSpanName == "" { |
| 349 | dataquality.WarnRootlessTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) |
| 350 | } |
| 351 | |
| 352 | if adjustIngestionSlack { |
| 353 | start, end = common.AdjustTimeRangeForSlack(b.meta.TenantID, b.ingestionSlack, start, end) |
| 354 | } |
| 355 | |
| 356 | // add to current |
| 357 | _, err := b.writer.Write([]*Trace{b.buffer}) |
| 358 | if err != nil { |
| 359 | return fmt.Errorf("error writing row: %w", err) |
| 360 | } |
| 361 | |
| 362 | b.mtx.Lock() |
| 363 | defer b.mtx.Unlock() |
| 364 | b.meta.ObjectAdded(start, end) |
| 365 | b.ids.Set(id, int64(b.ids.Len())) // Next row number |
| 366 | b.unflushedSize += int64(estimateMarshalledSizeFromTrace(b.buffer)) |
| 367 | |
| 368 | return nil |
| 369 | } |
| 370 | |
| 371 | func (b *walBlock) IngestionSlack() time.Duration { |
| 372 | return b.ingestionSlack |
no test coverage detected