MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamRePublish

Function TestJetStreamRePublish

test/js_test.go:9806–9871  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

9804}
9805
9806func TestJetStreamRePublish(t *testing.T) {
9807 s := RunBasicJetStreamServer()
9808 defer shutdownJSServerAndRemoveStorage(t, s)
9809
9810 nc, js := jsClient(t, s)
9811 defer nc.Close()
9812
9813 if _, err := js.AddStream(&nats.StreamConfig{
9814 Name: "RP",
9815 Storage: nats.MemoryStorage,
9816 Subjects: []string{"foo", "bar", "baz"},
9817 RePublish: &nats.RePublish{
9818 Source: ">",
9819 Destination: "RP.>",
9820 },
9821 }); err != nil {
9822 t.Fatalf("Error adding stream: %v", err)
9823 }
9824
9825 sub, err := nc.SubscribeSync("RP.>")
9826 if err != nil {
9827 t.Fatalf("Error on subscribe: %v", err)
9828 }
9829
9830 msg, toSend := []byte("OK TO REPUBLISH?"), 100
9831 for i := 0; i < toSend; i++ {
9832 js.Publish("foo", msg)
9833 js.Publish("bar", msg)
9834 js.Publish("baz", msg)
9835 }
9836
9837 lseq := map[string]int{
9838 "foo": 0,
9839 "bar": 0,
9840 "baz": 0,
9841 }
9842
9843 for i := 1; i <= toSend; i++ {
9844 m, err := sub.NextMsg(time.Second)
9845 if err != nil {
9846 t.Fatalf("Error on next msg: %v", err)
9847 }
9848 // Grab info from Header
9849 stream := m.Header.Get(nats.JSStream)
9850 if stream != "RP" {
9851 t.Fatalf("Unexpected header: %+v", m.Header)
9852 }
9853 // Make sure sequence is correct.
9854 seq, err := strconv.Atoi(m.Header.Get(nats.JSSequence))
9855 if err != nil {
9856 t.Fatalf("Error decoding sequence for %s", m.Header.Get(nats.JSSequence))
9857 }
9858 if seq != i {
9859 t.Fatalf("Expected sequence to be %v, got %v", i, seq)
9860 }
9861 // Make sure last sequence matches last seq we received on this subject.
9862 last, err := strconv.Atoi(m.Header.Get(nats.JSLastSequence))
9863 if err != nil {

Callers

nothing calls this directly

Calls 10

FatalfMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
SubscribeSyncMethod · 0.65
PublishMethod · 0.65
GetMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected