(t *testing.T)
| 1639 | } |
| 1640 | |
| 1641 | func TestKeyValueRePublish(t *testing.T) { |
| 1642 | s := RunBasicJetStreamServer() |
| 1643 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1644 | |
| 1645 | nc, js := jsClient(t, s) |
| 1646 | defer nc.Close() |
| 1647 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1648 | defer cancel() |
| 1649 | |
| 1650 | if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ |
| 1651 | Bucket: "TEST_UPDATE", |
| 1652 | }); err != nil { |
| 1653 | t.Fatalf("Error creating store: %v", err) |
| 1654 | } |
| 1655 | // This is expected to fail since server does not support as of now |
| 1656 | // the update of RePublish. |
| 1657 | if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ |
| 1658 | Bucket: "TEST_UPDATE", |
| 1659 | RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"}, |
| 1660 | }); err == nil { |
| 1661 | t.Fatal("Expected failure, did not get one") |
| 1662 | } |
| 1663 | |
| 1664 | kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ |
| 1665 | Bucket: "TEST", |
| 1666 | RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"}, |
| 1667 | }) |
| 1668 | if err != nil { |
| 1669 | t.Fatalf("Error creating store: %v", err) |
| 1670 | } |
| 1671 | si, err := js.Stream(ctx, "KV_TEST") |
| 1672 | if err != nil { |
| 1673 | t.Fatalf("Error getting stream info: %v", err) |
| 1674 | } |
| 1675 | if si.CachedInfo().Config.RePublish == nil { |
| 1676 | t.Fatal("Expected republish to be set, it was not") |
| 1677 | } |
| 1678 | |
| 1679 | sub, err := nc.SubscribeSync("bar.>") |
| 1680 | if err != nil { |
| 1681 | t.Fatalf("Error on sub: %v", err) |
| 1682 | } |
| 1683 | if _, err := kv.Put(ctx, "foo", []byte("value")); err != nil { |
| 1684 | t.Fatalf("Error on put: %v", err) |
| 1685 | } |
| 1686 | msg, err := sub.NextMsg(time.Second) |
| 1687 | if err != nil { |
| 1688 | t.Fatalf("Error on next: %v", err) |
| 1689 | } |
| 1690 | if v := string(msg.Data); v != "value" { |
| 1691 | t.Fatalf("Unexpected value: %s", v) |
| 1692 | } |
| 1693 | // The message should also have a header with the actual subject |
| 1694 | expected := "$KV.TEST.foo" |
| 1695 | if v := msg.Header.Get(jetstream.SubjectHeader); v != expected { |
| 1696 | t.Fatalf("Expected subject header %q, got %q", expected, v) |
| 1697 | } |
| 1698 | } |
nothing calls this directly
no test coverage detected