(t *testing.T)
| 700 | } |
| 701 | |
| 702 | func TestKeyValueWatchContext(t *testing.T) { |
| 703 | s := RunBasicJetStreamServer() |
| 704 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 705 | |
| 706 | nc, js := jsClient(t, s) |
| 707 | defer nc.Close() |
| 708 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 709 | defer cancel() |
| 710 | |
| 711 | kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCHCTX"}) |
| 712 | expectOk(t, err) |
| 713 | |
| 714 | watcher, err := kv.WatchAll(ctx) |
| 715 | expectOk(t, err) |
| 716 | defer watcher.Stop() |
| 717 | |
| 718 | // Trigger unsubscribe internally. |
| 719 | cancel() |
| 720 | |
| 721 | // Wait for a bit for unsubscribe to be done. |
| 722 | time.Sleep(500 * time.Millisecond) |
| 723 | |
| 724 | // Stopping watch that is already stopped via cancellation propagation is an error. |
| 725 | err = watcher.Stop() |
| 726 | if err == nil || err != nats.ErrBadSubscription { |
| 727 | t.Errorf("Expected invalid subscription, got: %v", err) |
| 728 | } |
| 729 | } |
| 730 | |
| 731 | func TestKeyValueWatchContextUpdates(t *testing.T) { |
| 732 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected