(t *testing.T)
| 459 | } |
| 460 | |
| 461 | func TestLiveStoreConsumeDropsOldRecords(t *testing.T) { |
| 462 | // default live store uses the default complete block timeout |
| 463 | ls, _ := defaultLiveStore(t, t.TempDir()) |
| 464 | |
| 465 | // Reset metrics |
| 466 | metricRecordsProcessed.Reset() |
| 467 | metricRecordsDropped.Reset() |
| 468 | |
| 469 | now := time.Now() |
| 470 | older := now.Add(-1 * (defaultCompleteBlockTimeout + time.Second)) |
| 471 | newer := now.Add(-1 * (defaultCompleteBlockTimeout - time.Second)) |
| 472 | |
| 473 | // Create test records - some old, some new |
| 474 | records := []*kgo.Record{ |
| 475 | { |
| 476 | Key: []byte("tenant1"), |
| 477 | Timestamp: older, // Too old (older than CompleteBlockTimeout) |
| 478 | Value: createValidPushRequest(t), |
| 479 | }, |
| 480 | { |
| 481 | Key: []byte("tenant1"), |
| 482 | Timestamp: newer, // Valid (newer than CompleteBlockTimeout) |
| 483 | Value: createValidPushRequest(t), |
| 484 | }, |
| 485 | { |
| 486 | Key: []byte("tenant2"), |
| 487 | Timestamp: older, // Too old |
| 488 | Value: createValidPushRequest(t), |
| 489 | }, |
| 490 | { |
| 491 | Key: []byte("tenant2"), |
| 492 | Timestamp: newer, // Valid |
| 493 | Value: createValidPushRequest(t), |
| 494 | }, |
| 495 | } |
| 496 | |
| 497 | // Call consume |
| 498 | _, err := ls.consume(context.Background(), createRecordIter(records), now) |
| 499 | require.NoError(t, err) |
| 500 | |
| 501 | // Verify metrics |
| 502 | // Should have processed 2 valid records (1 per tenant) |
| 503 | require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsProcessed.WithLabelValues("tenant1"))) |
| 504 | require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsProcessed.WithLabelValues("tenant2"))) |
| 505 | |
| 506 | // Should have dropped 2 old records (1 per tenant) |
| 507 | require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsDropped.WithLabelValues("tenant1", "too_old"))) |
| 508 | require.Equal(t, float64(1), test.MustGetCounterValue(metricRecordsDropped.WithLabelValues("tenant2", "too_old"))) |
| 509 | |
| 510 | err = services.StopAndAwaitTerminated(t.Context(), ls) |
| 511 | require.NoError(t, err) |
| 512 | } |
| 513 | |
| 514 | // TestLiveStoreConsumeTracksPartitionLag verifies that partition lag is tracked |
| 515 | // for all records regardless of outcome: too old (dropped), decode failure, or |
nothing calls this directly
no test coverage detected