MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamSubscribe_AckPolicy

Function TestJetStreamSubscribe_AckPolicy

test/js_test.go:4882–5266  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

4880}
4881
4882func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
4883 s := RunBasicJetStreamServer()
4884 defer shutdownJSServerAndRemoveStorage(t, s)
4885
4886 nc, js := jsClient(t, s)
4887 defer nc.Close()
4888
4889 var err error
4890
4891 // Create the stream using our client API.
4892 _, err = js.AddStream(&nats.StreamConfig{
4893 Name: "TEST",
4894 Subjects: []string{"foo", "bar"},
4895 })
4896 if err != nil {
4897 t.Fatalf("Unexpected error: %v", err)
4898 }
4899
4900 for i := 0; i < 10; i++ {
4901 payload := fmt.Sprintf("i:%d", i)
4902 js.Publish("foo", []byte(payload))
4903 }
4904
4905 for _, test := range []struct {
4906 name string
4907 subopt nats.SubOpt
4908 expected nats.AckPolicy
4909 }{
4910 {
4911 "ack-none", nats.AckNone(), nats.AckNonePolicy,
4912 },
4913 {
4914 "ack-all", nats.AckAll(), nats.AckAllPolicy,
4915 },
4916 {
4917 "ack-explicit", nats.AckExplicit(), nats.AckExplicitPolicy,
4918 },
4919 } {
4920 test := test
4921 t.Run(test.name, func(t *testing.T) {
4922 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4923 defer cancel()
4924
4925 got := 0
4926 totalMsgs := 10
4927 sub, err := js.Subscribe("foo", func(m *nats.Msg) {
4928 got++
4929 if got == totalMsgs {
4930 cancel()
4931 }
4932 }, test.subopt, nats.Durable(test.name))
4933
4934 if err != nil {
4935 t.Fatalf("Unexpected error: %v", err)
4936 }
4937
4938 <-ctx.Done()
4939 if got != totalMsgs {

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
NextMsgMethod · 0.80
ErrorfMethod · 0.80
AckSyncMethod · 0.80
NextMsgWithContextMethod · 0.80
NewInboxMethod · 0.80
UnsubscribeMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65

Tested by

no test coverage detected