MCPcopy
hub / github.com/nats-io/nats.go / TestAckVariants

Function TestAckVariants

jetstream/test/message_test.go:89–405  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

87}
88
89func TestAckVariants(t *testing.T) {
90 setup := func(ctx context.Context, t *testing.T) (*server.Server, *nats.Conn, jetstream.JetStream, jetstream.Consumer) {
91 srv := RunBasicJetStreamServer()
92
93 nc, err := nats.Connect(srv.ClientURL())
94 if err != nil {
95 t.Fatalf("Unexpected error: %v", err)
96 }
97
98 js, err := jetstream.New(nc)
99 if err != nil {
100 t.Fatalf("Unexpected error: %v", err)
101 }
102
103 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
104 if err != nil {
105 t.Fatalf("Unexpected error: %v", err)
106 }
107 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
108 Durable: "cons",
109 AckPolicy: jetstream.AckExplicitPolicy,
110 Description: "test consumer",
111 })
112 if err != nil {
113 t.Fatalf("Unexpected error: %v", err)
114 }
115 return srv, nc, js, c
116 }
117
118 t.Run("standard ack", func(t *testing.T) {
119 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
120 defer cancel()
121 srv, nc, js, c := setup(ctx, t)
122 defer shutdownJSServerAndRemoveStorage(t, srv)
123 defer nc.Close()
124
125 if _, err := js.Publish(ctx, "FOO.1", []byte("msg")); err != nil {
126 t.Fatalf("Unexpected error: %v", err)
127 }
128 msgs, err := c.Fetch(1)
129 if err != nil {
130 t.Fatalf("Unexpected error: %v", err)
131 }
132 msg := <-msgs.Messages()
133 if msg == nil {
134 t.Fatalf("No messages available")
135 }
136 if err := msgs.Error(); err != nil {
137 t.Fatalf("unexpected error during fetch: %v", err)
138 }
139 sub, err := nc.SubscribeSync(msg.Reply())
140 if err != nil {
141 t.Fatalf("Unexpected error: %v", err)
142 }
143
144 if err := msg.Ack(); err != nil {
145 t.Fatalf("Unexpected error: %v", err)
146 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
NextMsgWithContextMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
FetchMethod · 0.65
MessagesMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected