MCPcopy
hub / github.com/grafana/dskit / TestLifecycler_HeartbeatAfterBackendReset

Function TestLifecycler_HeartbeatAfterBackendReset

ring/lifecycler_test.go:867–929  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

865}
866
867func TestLifecycler_HeartbeatAfterBackendReset(t *testing.T) {
868 t.Parallel()
869
870 ctx := context.Background()
871
872 store, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
873 t.Cleanup(func() { assert.NoError(t, closer.Close()) })
874
875 var ringCfg Config
876 flagext.DefaultValues(&ringCfg)
877 ringCfg.KVStore.Mock = store
878
879 r, err := New(ringCfg, "ingester", testRingKey, log.NewNopLogger(), nil)
880 require.NoError(t, err)
881 require.NoError(t, services.StartAndAwaitRunning(ctx, r))
882 t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, r)) })
883
884 lifecyclerCfg := testLifecyclerConfig(ringCfg, testInstanceID)
885
886 lifecycler, err := NewLifecycler(lifecyclerCfg, nil, testRingName, testRingKey, true, log.NewNopLogger(), nil)
887 require.NoError(t, err)
888 require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler))
889 t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) })
890
891 // Wait until the instance has joined, is active, and has one token.
892 test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
893 d, err := r.KVClient.Get(ctx, testRingKey)
894 require.NoError(t, err)
895 return checkNormalised(d, testInstanceID)
896 })
897
898 // At this point the instance has been registered to the ring.
899 prevRegisteredAt := lifecycler.getRegisteredAt()
900 prevTokens := lifecycler.getTokens()
901
902 // Wait at least 1s because the registration timestamp has seconds precision
903 // and we want to assert it gets updates later on in this test.
904 time.Sleep(time.Second)
905
906 // Now we delete it from the ring to simulate a ring storage reset and we expect the next heartbeat
907 // will restore it.
908 require.NoError(t, store.CAS(ctx, testRingKey, func(interface{}) (out interface{}, retry bool, err error) {
909 return NewDesc(), true, nil
910 }))
911
912 test.Poll(t, time.Second, true, func() interface{} {
913 _, ok := getInstanceFromStore(t, store, testInstanceID)
914 return ok
915 })
916
917 // Ensure the registration timestamp has been updated.
918 desc, _ := getInstanceFromStore(t, store, testInstanceID)
919 assert.Greater(t, desc.GetRegisteredTimestamp(), prevRegisteredAt.Unix())
920 assert.Greater(t, lifecycler.getRegisteredAt().Unix(), prevRegisteredAt.Unix())
921
922 // Ensure other information has been preserved.
923 assert.Greater(t, desc.GetTimestamp(), int64(0))
924 assert.Equal(t, testInstanceID, desc.GetId())

Callers

nothing calls this directly

Calls 15

getRegisteredAtMethod · 0.95
getTokensMethod · 0.95
NewInMemoryClientFunction · 0.92
DefaultValuesFunction · 0.92
StartAndAwaitRunningFunction · 0.92
StopAndAwaitTerminatedFunction · 0.92
PollFunction · 0.92
GetCodecFunction · 0.85
testLifecyclerConfigFunction · 0.85
NewLifecyclerFunction · 0.85
checkNormalisedFunction · 0.85
NewDescFunction · 0.85

Tested by

no test coverage detected