MCPcopy
hub / github.com/nats-io/nats.go / TestKeyValueWatchContextUpdates

Function TestKeyValueWatchContextUpdates

jetstream/test/kv_test.go:731–766  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

729}
730
731func TestKeyValueWatchContextUpdates(t *testing.T) {
732 s := RunBasicJetStreamServer()
733 defer shutdownJSServerAndRemoveStorage(t, s)
734
735 nc, js := jsClient(t, s)
736 defer nc.Close()
737 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
738 defer cancel()
739
740 kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCHCTX"})
741 expectOk(t, err)
742
743 watcher, err := kv.WatchAll(ctx)
744 expectOk(t, err)
745 defer watcher.Stop()
746
747 // Pull the initial state done marker which is nil.
748 select {
749 case v := <-watcher.Updates():
750 if v != nil {
751 t.Fatalf("Expected nil marker, got %+v", v)
752 }
753 case <-time.After(time.Second):
754 t.Fatalf("Did not receive nil marker like expected")
755 }
756
757 // Fire a timer and cancel the context after 250ms.
758 time.AfterFunc(250*time.Millisecond, cancel)
759
760 // Make sure canceling will break us out here.
761 select {
762 case <-watcher.Updates():
763 case <-time.After(5 * time.Second):
764 t.Fatalf("Did not break out like expected")
765 }
766}
767
768func TestKeyValueBindStore(t *testing.T) {
769 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 10

FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
expectOkFunction · 0.70
CreateKeyValueMethod · 0.65
WatchAllMethod · 0.65
StopMethod · 0.65
UpdatesMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected