(t *testing.T)
| 729 | } |
| 730 | |
| 731 | func TestKeyValueWatchContextUpdates(t *testing.T) { |
| 732 | s := RunBasicJetStreamServer() |
| 733 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 734 | |
| 735 | nc, js := jsClient(t, s) |
| 736 | defer nc.Close() |
| 737 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 738 | defer cancel() |
| 739 | |
| 740 | kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCHCTX"}) |
| 741 | expectOk(t, err) |
| 742 | |
| 743 | watcher, err := kv.WatchAll(ctx) |
| 744 | expectOk(t, err) |
| 745 | defer watcher.Stop() |
| 746 | |
| 747 | // Pull the initial state done marker which is nil. |
| 748 | select { |
| 749 | case v := <-watcher.Updates(): |
| 750 | if v != nil { |
| 751 | t.Fatalf("Expected nil marker, got %+v", v) |
| 752 | } |
| 753 | case <-time.After(time.Second): |
| 754 | t.Fatalf("Did not receive nil marker like expected") |
| 755 | } |
| 756 | |
| 757 | // Fire a timer and cancel the context after 250ms. |
| 758 | time.AfterFunc(250*time.Millisecond, cancel) |
| 759 | |
| 760 | // Make sure canceling will break us out here. |
| 761 | select { |
| 762 | case <-watcher.Updates(): |
| 763 | case <-time.After(5 * time.Second): |
| 764 | t.Fatalf("Did not break out like expected") |
| 765 | } |
| 766 | } |
| 767 | |
| 768 | func TestKeyValueBindStore(t *testing.T) { |
| 769 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected