(t *testing.T)
| 9804 | } |
| 9805 | |
| 9806 | func TestJetStreamRePublish(t *testing.T) { |
| 9807 | s := RunBasicJetStreamServer() |
| 9808 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 9809 | |
| 9810 | nc, js := jsClient(t, s) |
| 9811 | defer nc.Close() |
| 9812 | |
| 9813 | if _, err := js.AddStream(&nats.StreamConfig{ |
| 9814 | Name: "RP", |
| 9815 | Storage: nats.MemoryStorage, |
| 9816 | Subjects: []string{"foo", "bar", "baz"}, |
| 9817 | RePublish: &nats.RePublish{ |
| 9818 | Source: ">", |
| 9819 | Destination: "RP.>", |
| 9820 | }, |
| 9821 | }); err != nil { |
| 9822 | t.Fatalf("Error adding stream: %v", err) |
| 9823 | } |
| 9824 | |
| 9825 | sub, err := nc.SubscribeSync("RP.>") |
| 9826 | if err != nil { |
| 9827 | t.Fatalf("Error on subscribe: %v", err) |
| 9828 | } |
| 9829 | |
| 9830 | msg, toSend := []byte("OK TO REPUBLISH?"), 100 |
| 9831 | for i := 0; i < toSend; i++ { |
| 9832 | js.Publish("foo", msg) |
| 9833 | js.Publish("bar", msg) |
| 9834 | js.Publish("baz", msg) |
| 9835 | } |
| 9836 | |
| 9837 | lseq := map[string]int{ |
| 9838 | "foo": 0, |
| 9839 | "bar": 0, |
| 9840 | "baz": 0, |
| 9841 | } |
| 9842 | |
| 9843 | for i := 1; i <= toSend; i++ { |
| 9844 | m, err := sub.NextMsg(time.Second) |
| 9845 | if err != nil { |
| 9846 | t.Fatalf("Error on next msg: %v", err) |
| 9847 | } |
| 9848 | // Grab info from Header |
| 9849 | stream := m.Header.Get(nats.JSStream) |
| 9850 | if stream != "RP" { |
| 9851 | t.Fatalf("Unexpected header: %+v", m.Header) |
| 9852 | } |
| 9853 | // Make sure sequence is correct. |
| 9854 | seq, err := strconv.Atoi(m.Header.Get(nats.JSSequence)) |
| 9855 | if err != nil { |
| 9856 | t.Fatalf("Error decoding sequence for %s", m.Header.Get(nats.JSSequence)) |
| 9857 | } |
| 9858 | if seq != i { |
| 9859 | t.Fatalf("Expected sequence to be %v, got %v", i, seq) |
| 9860 | } |
| 9861 | // Make sure last sequence matches last seq we received on this subject. |
| 9862 | last, err := strconv.Atoi(m.Header.Get(nats.JSLastSequence)) |
| 9863 | if err != nil { |
nothing calls this directly
no test coverage detected