(t *testing.T)
| 1956 | } |
| 1957 | |
| 1958 | func TestKeyValueLimitMarkerTTL(t *testing.T) { |
| 1959 | checkMsgHeaders := func(t *testing.T, js jetstream.JetStream, kv jetstream.KeyValue, key, expectedTTL, expectedReason string) { |
| 1960 | t.Helper() |
| 1961 | ctx := context.Background() |
| 1962 | stream, err := js.Stream(ctx, "KV_KVS") |
| 1963 | expectOk(t, err) |
| 1964 | msg, err := stream.GetLastMsgForSubject(ctx, "$KV.KVS."+key) |
| 1965 | expectOk(t, err) |
| 1966 | marker := msg.Header.Get(jetstream.MarkerReasonHeader) |
| 1967 | if marker != expectedReason { |
| 1968 | t.Fatalf("Expected marker to be MaxAge, got %q", marker) |
| 1969 | } |
| 1970 | ttl := msg.Header.Get(jetstream.MsgTTLHeader) |
| 1971 | if ttl != expectedTTL { |
| 1972 | t.Fatalf("Expected TTL to be 1s, got %q", ttl) |
| 1973 | } |
| 1974 | } |
| 1975 | |
| 1976 | checkMsgNotFound := func(t *testing.T, js jetstream.JetStream, kv jetstream.KeyValue, key string) { |
| 1977 | t.Helper() |
| 1978 | ctx := context.Background() |
| 1979 | stream, err := js.Stream(ctx, "KV_KVS") |
| 1980 | expectOk(t, err) |
| 1981 | msg, err := stream.GetLastMsgForSubject(ctx, "$KV.KVS."+key) |
| 1982 | if err == nil { |
| 1983 | t.Fatalf("Expected error getting message, got %v", msg) |
| 1984 | } |
| 1985 | if !errors.Is(err, jetstream.ErrMsgNotFound) { |
| 1986 | t.Fatalf("Expected error to be ErrMsgNotFound, got: %v", err) |
| 1987 | } |
| 1988 | } |
| 1989 | |
| 1990 | t.Run("create with TTL", func(t *testing.T) { |
| 1991 | s := RunBasicJetStreamServer() |
| 1992 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1993 | |
| 1994 | nc, js := jsClient(t, s) |
| 1995 | defer nc.Close() |
| 1996 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1997 | defer cancel() |
| 1998 | |
| 1999 | kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", LimitMarkerTTL: time.Second}) |
| 2000 | expectOk(t, err) |
| 2001 | |
| 2002 | // Put in a few names and ages. |
| 2003 | _, err = kv.Create(ctx, "age", []byte("22"), jetstream.KeyTTL(time.Second)) |
| 2004 | expectOk(t, err) |
| 2005 | |
| 2006 | // create watcher to wait for deletion |
| 2007 | watcher, err := kv.WatchAll(ctx, jetstream.UpdatesOnly()) |
| 2008 | expectOk(t, err) |
| 2009 | |
| 2010 | _, err = kv.Get(ctx, "age") |
| 2011 | expectOk(t, err) |
| 2012 | time.Sleep(1500 * time.Millisecond) |
| 2013 | |
| 2014 | _, err = kv.Get(ctx, "age") |
| 2015 | expectErr(t, err, jetstream.ErrKeyNotFound) |
nothing calls this directly
no test coverage detected