(t *testing.T, srvs ...*jsServer)
| 3885 | } |
| 3886 | |
| 3887 | func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) { |
| 3888 | s := srvs[0] |
| 3889 | nc, js := jsClient(t, s.Server) |
| 3890 | defer nc.Close() |
| 3891 | |
| 3892 | var err error |
| 3893 | |
| 3894 | _, err = js.AddStream(&nats.StreamConfig{ |
| 3895 | Name: "foo", |
| 3896 | Subjects: []string{"foo.A", "foo.B", "foo.C"}, |
| 3897 | }) |
| 3898 | if err != nil { |
| 3899 | t.Fatalf("Unexpected error: %v", err) |
| 3900 | } |
| 3901 | |
| 3902 | for i := 0; i < 5; i++ { |
| 3903 | msg := nats.NewMsg("foo.A") |
| 3904 | data := fmt.Sprintf("A:%d", i) |
| 3905 | msg.Data = []byte(data) |
| 3906 | msg.Header = nats.Header{ |
| 3907 | "X-NATS-Key": []string{"123"}, |
| 3908 | } |
| 3909 | msg.Header.Add("X-Nats-Test-Data", data) |
| 3910 | js.PublishMsg(msg) |
| 3911 | js.Publish("foo.B", []byte(fmt.Sprintf("B:%d", i))) |
| 3912 | js.Publish("foo.C", []byte(fmt.Sprintf("C:%d", i))) |
| 3913 | } |
| 3914 | |
| 3915 | var originalSeq uint64 |
| 3916 | t.Run("get message", func(t *testing.T) { |
| 3917 | expected := 5 |
| 3918 | msgs := make([]*nats.Msg, 0) |
| 3919 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 3920 | defer cancel() |
| 3921 | |
| 3922 | sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) { |
| 3923 | msgs = append(msgs, msg) |
| 3924 | if len(msgs) == expected { |
| 3925 | cancel() |
| 3926 | } |
| 3927 | }) |
| 3928 | if err != nil { |
| 3929 | t.Fatal(err) |
| 3930 | } |
| 3931 | <-ctx.Done() |
| 3932 | sub.Unsubscribe() |
| 3933 | |
| 3934 | got := len(msgs) |
| 3935 | if got != expected { |
| 3936 | t.Fatalf("Expected: %d, got: %d", expected, got) |
| 3937 | } |
| 3938 | |
| 3939 | msg := msgs[3] |
| 3940 | meta, err := msg.Metadata() |
| 3941 | if err != nil { |
| 3942 | t.Fatal(err) |
| 3943 | } |
| 3944 | originalSeq = meta.Sequence.Stream |
nothing calls this directly
no test coverage detected