(t *testing.T)
| 564 | } |
| 565 | |
| 566 | func TestKeyValueWatchContextUpdates(t *testing.T) { |
| 567 | s := RunBasicJetStreamServer() |
| 568 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 569 | |
| 570 | nc, js := jsClient(t, s) |
| 571 | defer nc.Close() |
| 572 | |
| 573 | kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCHCTX"}) |
| 574 | expectOk(t, err) |
| 575 | |
| 576 | ctx, cancel := context.WithCancel(context.Background()) |
| 577 | defer cancel() |
| 578 | |
| 579 | watcher, err := kv.WatchAll(nats.Context(ctx)) |
| 580 | expectOk(t, err) |
| 581 | defer watcher.Stop() |
| 582 | |
| 583 | // Pull the initial state done marker which is nil. |
| 584 | select { |
| 585 | case v := <-watcher.Updates(): |
| 586 | if v != nil { |
| 587 | t.Fatalf("Expected nil marker, got %+v", v) |
| 588 | } |
| 589 | case <-time.After(time.Second): |
| 590 | t.Fatalf("Did not receive nil marker like expected") |
| 591 | } |
| 592 | |
| 593 | // Fire a timer and cancel the context after 250ms. |
| 594 | time.AfterFunc(250*time.Millisecond, cancel) |
| 595 | |
| 596 | // Make sure canceling will break us out here. |
| 597 | select { |
| 598 | case <-watcher.Updates(): |
| 599 | case <-time.After(5 * time.Second): |
| 600 | t.Fatalf("Did not break out like expected") |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | func TestKeyValueBindStore(t *testing.T) { |
| 605 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected