(t *testing.T)
| 930 | } |
| 931 | |
| 932 | func TestRequeueOnError(t *testing.T) { |
| 933 | tmpDir := t.TempDir() |
| 934 | |
| 935 | cfg := defaultConfig(t, tmpDir) |
| 936 | initialBackoff := 100 * time.Millisecond |
| 937 | cfg.initialBackoff = initialBackoff |
| 938 | cfg.maxBackoff = 3 * initialBackoff |
| 939 | cfg.CompleteBlockConcurrency = 1 // to simplify the test |
| 940 | cfg.holdAllBackgroundProcesses = false |
| 941 | liveStore, err := liveStoreWithConfig(t, cfg) |
| 942 | require.NoError(t, err) |
| 943 | require.NotNil(t, liveStore) |
| 944 | |
| 945 | inst, err := liveStore.getOrCreateInstance(testTenantID) |
| 946 | require.NoError(t, err) |
| 947 | enc := erroredEnc{ |
| 948 | VersionedEncoding: inst.completeBlockEncoding, |
| 949 | mx: sync.Mutex{}, |
| 950 | } |
| 951 | enc.SetError(errors.New("forced error")) |
| 952 | inst.completeBlockEncoding = &enc |
| 953 | |
| 954 | // push data |
| 955 | expectedID, expectedTrace := pushToLiveStore(t, liveStore) |
| 956 | requireTraceInLiveStore(t, liveStore, expectedID, expectedTrace) |
| 957 | requireInstanceState(t, inst, instanceState{liveTraces: 1, walBlocks: 0, completeBlocks: 0}) |
| 958 | |
| 959 | // cut to wal and enqueue complete operation |
| 960 | liveStore.cutAllInstancesToWal() |
| 961 | requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0}) |
| 962 | |
| 963 | // wait for the first backoff that should not be successful |
| 964 | time.Sleep(initialBackoff * 2) |
| 965 | requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0}) |
| 966 | // now completeBlockEncoding does not error and block should be flushed successfully |
| 967 | enc.SetError(nil) |
| 968 | time.Sleep(initialBackoff * 8) |
| 969 | requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 0, completeBlocks: 1}) |
| 970 | } |
| 971 | |
| 972 | type instanceState struct { |
| 973 | liveTraces int |
nothing calls this directly
no test coverage detected