MCPcopy
hub / github.com/grafana/tempo / TestLiveStoreConsumeDropsOldRecords

Function TestLiveStoreConsumeDropsOldRecords

modules/livestore/live_store_test.go:461–512  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

459}
460
461func TestLiveStoreConsumeDropsOldRecords(t *testing.T) {
462 // default live store uses the default complete block timeout
463 ls, _ := defaultLiveStore(t, t.TempDir())
464
465 // Reset metrics
466 metricRecordsProcessed.Reset()
467 metricRecordsDropped.Reset()
468
469 now := time.Now()
470 older := now.Add(-1 * (defaultCompleteBlockTimeout + time.Second))
471 newer := now.Add(-1 * (defaultCompleteBlockTimeout - time.Second))
472
473 // Create test records - some old, some new
474 records := []*kgo.Record{
475 {
476 Key: []byte("tenant1"),
477 Timestamp: older, // Too old (older than CompleteBlockTimeout)
478 Value: createValidPushRequest(t),
479 },
480 {
481 Key: []byte("tenant1"),
482 Timestamp: newer, // Valid (newer than CompleteBlockTimeout)
483 Value: createValidPushRequest(t),
484 },
485 {
486 Key: []byte("tenant2"),
487 Timestamp: older, // Too old
488 Value: createValidPushRequest(t),
489 },
490 {
491 Key: []byte("tenant2"),
492 Timestamp: newer, // Valid
493 Value: createValidPushRequest(t),
494 },
495 }
496
497 // Call consume
498 _, err := ls.consume(context.Background(), createRecordIter(records), now)
499 require.NoError(t, err)
500
501 // Verify metrics
502 // Should have processed 2 valid records (1 per tenant)
503 require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsProcessed.WithLabelValues("tenant1")))
504 require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsProcessed.WithLabelValues("tenant2")))
505
506 // Should have dropped 2 old records (1 per tenant)
507 require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsDropped.WithLabelValues("tenant1", "too_old")))
508 require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsDropped.WithLabelValues("tenant2", "too_old")))
509
510 err = services.StopAndAwaitTerminated(t.Context(), ls)
511 require.NoError(t, err)
512}
513
514// TestLiveStoreConsumeTracksPartitionLag verifies that partition lag is tracked
515// for all records regardless of outcome: too old (dropped), decode failure, or

Callers

nothing calls this directly

Calls 10

MustGetCounterValueFunction · 0.92
defaultLiveStoreFunction · 0.85
createValidPushRequestFunction · 0.85
createRecordIterFunction · 0.85
ResetMethod · 0.65
NowMethod · 0.65
AddMethod · 0.65
ContextMethod · 0.65
consumeMethod · 0.45
EqualMethod · 0.45

Tested by

no test coverage detected