(t *testing.T)
| 534 | } |
| 535 | |
| 536 | func TestKeyValueWatchContext(t *testing.T) { |
| 537 | s := RunBasicJetStreamServer() |
| 538 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 539 | |
| 540 | nc, js := jsClient(t, s) |
| 541 | defer nc.Close() |
| 542 | |
| 543 | kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCHCTX"}) |
| 544 | expectOk(t, err) |
| 545 | |
| 546 | ctx, cancel := context.WithCancel(context.Background()) |
| 547 | defer cancel() |
| 548 | |
| 549 | watcher, err := kv.WatchAll(nats.Context(ctx)) |
| 550 | expectOk(t, err) |
| 551 | defer watcher.Stop() |
| 552 | |
| 553 | // Trigger unsubscribe internally. |
| 554 | cancel() |
| 555 | |
| 556 | // Wait for a bit for unsubscribe to be done. |
| 557 | time.Sleep(500 * time.Millisecond) |
| 558 | |
| 559 | // Stopping watch that is already stopped via cancellation propagation is an error. |
| 560 | err = watcher.Stop() |
| 561 | if err == nil || err != nats.ErrBadSubscription { |
| 562 | t.Errorf("Expected invalid subscription, got: %v", err) |
| 563 | } |
| 564 | } |
| 565 | |
| 566 | func TestKeyValueWatchContextUpdates(t *testing.T) { |
| 567 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected