(t *testing.T)
| 4880 | } |
| 4881 | |
| 4882 | func TestJetStreamSubscribe_AckPolicy(t *testing.T) { |
| 4883 | s := RunBasicJetStreamServer() |
| 4884 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 4885 | |
| 4886 | nc, js := jsClient(t, s) |
| 4887 | defer nc.Close() |
| 4888 | |
| 4889 | var err error |
| 4890 | |
| 4891 | // Create the stream using our client API. |
| 4892 | _, err = js.AddStream(&nats.StreamConfig{ |
| 4893 | Name: "TEST", |
| 4894 | Subjects: []string{"foo", "bar"}, |
| 4895 | }) |
| 4896 | if err != nil { |
| 4897 | t.Fatalf("Unexpected error: %v", err) |
| 4898 | } |
| 4899 | |
| 4900 | for i := 0; i < 10; i++ { |
| 4901 | payload := fmt.Sprintf("i:%d", i) |
| 4902 | js.Publish("foo", []byte(payload)) |
| 4903 | } |
| 4904 | |
| 4905 | for _, test := range []struct { |
| 4906 | name string |
| 4907 | subopt nats.SubOpt |
| 4908 | expected nats.AckPolicy |
| 4909 | }{ |
| 4910 | { |
| 4911 | "ack-none", nats.AckNone(), nats.AckNonePolicy, |
| 4912 | }, |
| 4913 | { |
| 4914 | "ack-all", nats.AckAll(), nats.AckAllPolicy, |
| 4915 | }, |
| 4916 | { |
| 4917 | "ack-explicit", nats.AckExplicit(), nats.AckExplicitPolicy, |
| 4918 | }, |
| 4919 | } { |
| 4920 | test := test |
| 4921 | t.Run(test.name, func(t *testing.T) { |
| 4922 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 4923 | defer cancel() |
| 4924 | |
| 4925 | got := 0 |
| 4926 | totalMsgs := 10 |
| 4927 | sub, err := js.Subscribe("foo", func(m *nats.Msg) { |
| 4928 | got++ |
| 4929 | if got == totalMsgs { |
| 4930 | cancel() |
| 4931 | } |
| 4932 | }, test.subopt, nats.Durable(test.name)) |
| 4933 | |
| 4934 | if err != nil { |
| 4935 | t.Fatalf("Unexpected error: %v", err) |
| 4936 | } |
| 4937 | |
| 4938 | <-ctx.Done() |
| 4939 | if got != totalMsgs { |
nothing calls this directly
no test coverage detected