MCPcopy
hub / github.com/nats-io/nats.go / TestPullConsumerConsume

Function TestPullConsumerConsume

jetstream/test/pull_test.go:2200–3317  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2198}
2199
2200func TestPullConsumerConsume(t *testing.T) {
2201 testSubject := "FOO.123"
2202 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
2203 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
2204 for _, msg := range testMsgs {
2205 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
2206 t.Fatalf("Unexpected error during publish: %s", err)
2207 }
2208 }
2209 }
2210
2211 t.Run("no options", func(t *testing.T) {
2212 srv := RunBasicJetStreamServer()
2213 defer shutdownJSServerAndRemoveStorage(t, srv)
2214 nc, err := nats.Connect(srv.ClientURL())
2215 if err != nil {
2216 t.Fatalf("Unexpected error: %v", err)
2217 }
2218
2219 js, err := jetstream.New(nc)
2220 if err != nil {
2221 t.Fatalf("Unexpected error: %v", err)
2222 }
2223 defer nc.Close()
2224
2225 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
2226 defer cancel()
2227 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
2228 if err != nil {
2229 t.Fatalf("Unexpected error: %v", err)
2230 }
2231 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
2232 if err != nil {
2233 t.Fatalf("Unexpected error: %v", err)
2234 }
2235
2236 msgs := make([]jetstream.Msg, 0)
2237 wg := &sync.WaitGroup{}
2238 wg.Add(len(testMsgs))
2239 l, err := c.Consume(func(msg jetstream.Msg) {
2240 msgs = append(msgs, msg)
2241 wg.Done()
2242 })
2243 if err != nil {
2244 t.Fatalf("Unexpected error: %v", err)
2245 }
2246 defer l.Stop()
2247
2248 publishTestMsgs(t, js)
2249 wg.Wait()
2250 if len(msgs) != len(testMsgs) {
2251 t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
2252 }
2253 for i, msg := range msgs {
2254 if string(msg.Data()) != testMsgs[i] {
2255 t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data()))
2256 }
2257 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullMaxMessagesTypeAlias · 0.92
ConsumeErrHandlerTypeAlias · 0.92
PullMaxBytesTypeAlias · 0.92
StopAfterTypeAlias · 0.92
PullExpiryTypeAlias · 0.92
PullHeartbeatTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
PendingMethod · 0.80
UnsubscribeMethod · 0.80

Tested by

no test coverage detected