TestLiveStorePartitionDownscale tests the complete partition downscale workflow: - Marking a partition as INACTIVE - Verifying new traffic goes to active partitions - Cancelling the downscale - Verifying normal operation resumes
(t *testing.T)
| 19 | // - Cancelling the downscale |
| 20 | // - Verifying normal operation resumes |
| 21 | func TestLiveStorePartitionDownscale(t *testing.T) { |
| 22 | util.RunIntegrationTests(t, util.TestHarnessConfig{}, func(h *util.TempoHarness) { |
| 23 | // harness creates 2 live stores that own a first partition, shutdown B we don't want it for this test |
| 24 | require.NoError(t, h.Services[util.ServiceLiveStoreZoneB].Stop()) |
| 25 | |
| 26 | // start a new livestorea that own a second partition. -1 postfix is important to start a new partition! |
| 27 | liveStorePartition1 := newLiveStore("live-store-zone-a-1", "zone-a") |
| 28 | require.NoError(t, h.TestScenario.StartAndWaitReady(liveStorePartition1)) |
| 29 | |
| 30 | // wait for 2 active partitions |
| 31 | waitActivePartitions(t, h.Services[util.ServiceDistributor], 2) |
| 32 | |
| 33 | info := tempoUtil.NewTraceInfo(time.Now(), "") |
| 34 | require.NoError(t, h.WriteTraceInfo(info, "")) |
| 35 | |
| 36 | liveStorePartition0 := h.Services[util.ServiceLiveStoreZoneA] |
| 37 | liveStoreActive := waitForTraceInLiveStore(t, 1, liveStorePartition0, liveStorePartition1) |
| 38 | var liveStoreDownscale *e2e.HTTPService |
| 39 | if liveStoreActive == liveStorePartition0 { |
| 40 | liveStoreDownscale = liveStorePartition1 |
| 41 | } else { |
| 42 | liveStoreDownscale = liveStorePartition0 |
| 43 | } |
| 44 | |
| 45 | apiClient := h.APIClientHTTP("") |
| 46 | distributor := h.Services[util.ServiceDistributor] |
| 47 | |
| 48 | t.Run("verify partition is ACTIVE", func(t *testing.T) { |
| 49 | for _, liveStore := range []*e2e.HTTPService{liveStoreDownscale, liveStoreActive} { |
| 50 | res := preparePartitionDownscale(t, http.MethodGet, liveStore) |
| 51 | require.Equal(t, "PartitionActive", res.State) |
| 52 | } |
| 53 | }) |
| 54 | |
| 55 | t.Run("prepare partition for downscale", func(t *testing.T) { |
| 56 | // Set live-store's partition to INACTIVE |
| 57 | res := preparePartitionDownscale(t, http.MethodPost, liveStoreDownscale) |
| 58 | require.Greater(t, res.Timestamp, int64(0)) |
| 59 | require.Equal(t, "PartitionInactive", res.State) |
| 60 | |
| 61 | // Verify state |
| 62 | verifyPartitionState(t, distributor, "Inactive", 1) |
| 63 | verifyPartitionState(t, distributor, "Active", 1) |
| 64 | }) |
| 65 | |
| 66 | t.Run("verify data is still accessible during downscale", func(t *testing.T) { |
| 67 | util.QueryAndAssertTrace(t, apiClient, info) |
| 68 | }) |
| 69 | |
| 70 | t.Run("generate new trace during downscale", func(t *testing.T) { |
| 71 | inactiveCount, err := liveStoreDownscale.SumMetrics([]string{"tempo_live_store_records_processed_total"}, e2e.SkipMissingMetrics) |
| 72 | require.NoError(t, err) |
| 73 | |
| 74 | info := tempoUtil.NewTraceInfo(time.Now(), "") |
| 75 | require.NoError(t, h.WriteTraceInfo(info, "")) |
| 76 | |
| 77 | require.NoError(t, liveStoreActive.WaitSumMetrics(e2e.Equals(float64(2)), "tempo_live_store_traces_created_total")) |
| 78 |
nothing calls this directly
no test coverage detected