(t *testing.T)
| 9871 | } |
| 9872 | |
| 9873 | func TestJetStreamDirectGetMsg(t *testing.T) { |
| 9874 | // Using standalone server here, we are testing the client side API, not |
| 9875 | // the server feature, which has tests checking it works in cluster mode. |
| 9876 | s := RunBasicJetStreamServer() |
| 9877 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 9878 | |
| 9879 | nc, js := jsClient(t, s) |
| 9880 | defer nc.Close() |
| 9881 | |
| 9882 | si, err := js.AddStream(&nats.StreamConfig{ |
| 9883 | Name: "DGM", |
| 9884 | Storage: nats.MemoryStorage, |
| 9885 | Subjects: []string{"foo", "bar"}, |
| 9886 | }) |
| 9887 | if err != nil { |
| 9888 | t.Fatalf("Error adding stream: %v", err) |
| 9889 | } |
| 9890 | |
| 9891 | send := func(subj, body string) { |
| 9892 | t.Helper() |
| 9893 | if _, err := js.Publish(subj, []byte(body)); err != nil { |
| 9894 | t.Fatalf("Error on publish: %v", err) |
| 9895 | } |
| 9896 | } |
| 9897 | |
| 9898 | send("foo", "a") |
| 9899 | send("foo", "b") |
| 9900 | send("foo", "c") |
| 9901 | send("bar", "d") |
| 9902 | send("foo", "e") |
| 9903 | |
| 9904 | // Without AllowDirect, we should get no responders |
| 9905 | if _, err := js.GetMsg("DGM", 1, nats.DirectGet()); err != nats.ErrNoResponders { |
| 9906 | t.Fatalf("Unexpected error: %v", err) |
| 9907 | } |
| 9908 | |
| 9909 | // Update stream: |
| 9910 | si.Config.AllowDirect = true |
| 9911 | si, err = js.UpdateStream(&si.Config) |
| 9912 | if err != nil { |
| 9913 | t.Fatalf("Error updating stream: %v", err) |
| 9914 | } |
| 9915 | if !si.Config.AllowDirect { |
| 9916 | t.Fatalf("AllowDirect should be true: %+v", si) |
| 9917 | } |
| 9918 | |
| 9919 | check := func(seq uint64, opt nats.JSOpt, useGetLast bool, expectedSubj string, expectedSeq uint64, expectedBody string) { |
| 9920 | t.Helper() |
| 9921 | |
| 9922 | var msg *nats.RawStreamMsg |
| 9923 | var err error |
| 9924 | if useGetLast { |
| 9925 | msg, err = js.GetLastMsg("DGM", expectedSubj, []nats.JSOpt{opt}...) |
| 9926 | } else { |
| 9927 | msg, err = js.GetMsg("DGM", seq, []nats.JSOpt{opt}...) |
| 9928 | } |
| 9929 | if err != nil { |
| 9930 | t.Fatalf("Unable to get message: %v", err) |
nothing calls this directly
no test coverage detected