MCPcopy
hub / github.com/grafana/tempo / TestLiveStorePartitionDownscale

Function TestLiveStorePartitionDownscale

integration/operations/livestore_test.go:21–112  ·  integration/operations/livestore_test.go::TestLiveStorePartitionDownscale

TestLiveStorePartitionDownscale tests the complete partition downscale workflow: - Marking a partition as INACTIVE - Verifying new traffic goes to active partitions - Cancelling the downscale - Verifying normal operation resumes

(t *testing.T)

Source from the content-addressed store, hash-verified

19// - Cancelling the downscale
20// - Verifying normal operation resumes
21func TestLiveStorePartitionDownscale(t *testing.T) {
22 util.RunIntegrationTests(t, util.TestHarnessConfig{}, func(h *util.TempoHarness) {
23 // harness creates 2 live stores that own a first partition, shutdown B we don't want it for this test
24 require.NoError(t, h.Services[util.ServiceLiveStoreZoneB].Stop())
25
26 // start a new livestorea that own a second partition. -1 postfix is important to start a new partition!
27 liveStorePartition1 := newLiveStore("live-store-zone-a-1", "zone-a")
28 require.NoError(t, h.TestScenario.StartAndWaitReady(liveStorePartition1))
29
30 // wait for 2 active partitions
31 waitActivePartitions(t, h.Services[util.ServiceDistributor], 2)
32
33 info := tempoUtil.NewTraceInfo(time.Now(), "")
34 require.NoError(t, h.WriteTraceInfo(info, ""))
35
36 liveStorePartition0 := h.Services[util.ServiceLiveStoreZoneA]
37 liveStoreActive := waitForTraceInLiveStore(t, 1, liveStorePartition0, liveStorePartition1)
38 var liveStoreDownscale *e2e.HTTPService
39 if liveStoreActive == liveStorePartition0 {
40 liveStoreDownscale = liveStorePartition1
41 } else {
42 liveStoreDownscale = liveStorePartition0
43 }
44
45 apiClient := h.APIClientHTTP("")
46 distributor := h.Services[util.ServiceDistributor]
47
48 t.Run("verify partition is ACTIVE", func(t *testing.T) {
49 for _, liveStore := range []*e2e.HTTPService{liveStoreDownscale, liveStoreActive} {
50 res := preparePartitionDownscale(t, http.MethodGet, liveStore)
51 require.Equal(t, "PartitionActive", res.State)
52 }
53 })
54
55 t.Run("prepare partition for downscale", func(t *testing.T) {
56 // Set live-store's partition to INACTIVE
57 res := preparePartitionDownscale(t, http.MethodPost, liveStoreDownscale)
58 require.Greater(t, res.Timestamp, int64(0))
59 require.Equal(t, "PartitionInactive", res.State)
60
61 // Verify state
62 verifyPartitionState(t, distributor, "Inactive", 1)
63 verifyPartitionState(t, distributor, "Active", 1)
64 })
65
66 t.Run("verify data is still accessible during downscale", func(t *testing.T) {
67 util.QueryAndAssertTrace(t, apiClient, info)
68 })
69
70 t.Run("generate new trace during downscale", func(t *testing.T) {
71 inactiveCount, err := liveStoreDownscale.SumMetrics([]string{"tempo_live_store_records_processed_total"}, e2e.SkipMissingMetrics)
72 require.NoError(t, err)
73
74 info := tempoUtil.NewTraceInfo(time.Now(), "")
75 require.NoError(t, h.WriteTraceInfo(info, ""))
76
77 require.NoError(t, liveStoreActive.WaitSumMetrics(e2e.Equals(float64(2)), "tempo_live_store_traces_created_total"))
78

Callers

nothing calls this directly

Calls 12

newLiveStoreFunction · 0.85
waitActivePartitionsFunction · 0.85
waitForTraceInLiveStoreFunction · 0.85
verifyPartitionStateFunction · 0.85
WriteTraceInfoMethod · 0.80
APIClientHTTPMethod · 0.80
EqualsMethod · 0.80
StopMethod · 0.65
NowMethod · 0.65
RunMethod · 0.45
EqualMethod · 0.45

Tested by

no test coverage detected