MCPcopy
hub / github.com/nats-io/nats.go / TestKeyValueLimitMarkerTTL

Function TestKeyValueLimitMarkerTTL

jetstream/test/kv_test.go:1958–2080  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1956}
1957
1958func TestKeyValueLimitMarkerTTL(t *testing.T) {
1959 checkMsgHeaders := func(t *testing.T, js jetstream.JetStream, kv jetstream.KeyValue, key, expectedTTL, expectedReason string) {
1960 t.Helper()
1961 ctx := context.Background()
1962 stream, err := js.Stream(ctx, "KV_KVS")
1963 expectOk(t, err)
1964 msg, err := stream.GetLastMsgForSubject(ctx, "$KV.KVS."+key)
1965 expectOk(t, err)
1966 marker := msg.Header.Get(jetstream.MarkerReasonHeader)
1967 if marker != expectedReason {
1968 t.Fatalf("Expected marker to be MaxAge, got %q", marker)
1969 }
1970 ttl := msg.Header.Get(jetstream.MsgTTLHeader)
1971 if ttl != expectedTTL {
1972 t.Fatalf("Expected TTL to be 1s, got %q", ttl)
1973 }
1974 }
1975
1976 checkMsgNotFound := func(t *testing.T, js jetstream.JetStream, kv jetstream.KeyValue, key string) {
1977 t.Helper()
1978 ctx := context.Background()
1979 stream, err := js.Stream(ctx, "KV_KVS")
1980 expectOk(t, err)
1981 msg, err := stream.GetLastMsgForSubject(ctx, "$KV.KVS."+key)
1982 if err == nil {
1983 t.Fatalf("Expected error getting message, got %v", msg)
1984 }
1985 if !errors.Is(err, jetstream.ErrMsgNotFound) {
1986 t.Fatalf("Expected error to be ErrMsgNotFound, got: %v", err)
1987 }
1988 }
1989
1990 t.Run("create with TTL", func(t *testing.T) {
1991 s := RunBasicJetStreamServer()
1992 defer shutdownJSServerAndRemoveStorage(t, s)
1993
1994 nc, js := jsClient(t, s)
1995 defer nc.Close()
1996 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1997 defer cancel()
1998
1999 kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", LimitMarkerTTL: time.Second})
2000 expectOk(t, err)
2001
2002 // Put in a few names and ages.
2003 _, err = kv.Create(ctx, "age", []byte("22"), jetstream.KeyTTL(time.Second))
2004 expectOk(t, err)
2005
2006 // create watcher to wait for deletion
2007 watcher, err := kv.WatchAll(ctx, jetstream.UpdatesOnly())
2008 expectOk(t, err)
2009
2010 _, err = kv.Get(ctx, "age")
2011 expectOk(t, err)
2012 time.Sleep(1500 * time.Millisecond)
2013
2014 _, err = kv.Get(ctx, "age")
2015 expectErr(t, err, jetstream.ErrKeyNotFound)

Callers

nothing calls this directly

Calls 15

KeyTTLFunction · 0.92
UpdatesOnlyFunction · 0.92
PurgeTTLFunction · 0.92
FatalfMethod · 0.80
expectOkFunction · 0.70
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
expectErrFunction · 0.70
StreamMethod · 0.65
GetLastMsgForSubjectMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected