MCPcopy
hub / github.com/nats-io/nats.go / TestKeyValueRePublish

Function TestKeyValueRePublish

jetstream/test/kv_test.go:1641–1698  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1639}
1640
1641func TestKeyValueRePublish(t *testing.T) {
1642 s := RunBasicJetStreamServer()
1643 defer shutdownJSServerAndRemoveStorage(t, s)
1644
1645 nc, js := jsClient(t, s)
1646 defer nc.Close()
1647 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1648 defer cancel()
1649
1650 if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
1651 Bucket: "TEST_UPDATE",
1652 }); err != nil {
1653 t.Fatalf("Error creating store: %v", err)
1654 }
1655 // This is expected to fail since server does not support as of now
1656 // the update of RePublish.
1657 if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
1658 Bucket: "TEST_UPDATE",
1659 RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"},
1660 }); err == nil {
1661 t.Fatal("Expected failure, did not get one")
1662 }
1663
1664 kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
1665 Bucket: "TEST",
1666 RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"},
1667 })
1668 if err != nil {
1669 t.Fatalf("Error creating store: %v", err)
1670 }
1671 si, err := js.Stream(ctx, "KV_TEST")
1672 if err != nil {
1673 t.Fatalf("Error getting stream info: %v", err)
1674 }
1675 if si.CachedInfo().Config.RePublish == nil {
1676 t.Fatal("Expected republish to be set, it was not")
1677 }
1678
1679 sub, err := nc.SubscribeSync("bar.>")
1680 if err != nil {
1681 t.Fatalf("Error on sub: %v", err)
1682 }
1683 if _, err := kv.Put(ctx, "foo", []byte("value")); err != nil {
1684 t.Fatalf("Error on put: %v", err)
1685 }
1686 msg, err := sub.NextMsg(time.Second)
1687 if err != nil {
1688 t.Fatalf("Error on next: %v", err)
1689 }
1690 if v := string(msg.Data); v != "value" {
1691 t.Fatalf("Unexpected value: %s", v)
1692 }
1693 // The message should also have a header with the actual subject
1694 expected := "$KV.TEST.foo"
1695 if v := msg.Header.Get(jetstream.SubjectHeader); v != expected {
1696 t.Fatalf("Expected subject header %q, got %q", expected, v)
1697 }
1698}

Callers

nothing calls this directly

Calls 12

FatalfMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
CreateKeyValueMethod · 0.65
StreamMethod · 0.65
CachedInfoMethod · 0.65
SubscribeSyncMethod · 0.65
PutMethod · 0.65
GetMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected