MCPcopy
hub / github.com/nats-io/nats.go / TestPushConsumerConsume

Function TestPushConsumerConsume

jetstream/test/push_test.go:14–507  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

12)
13
14func TestPushConsumerConsume(t *testing.T) {
15 testSubject := "FOO.123"
16 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
17 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
18 for _, msg := range testMsgs {
19 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
20 t.Fatalf("Unexpected error during publish: %s", err)
21 }
22 }
23 }
24
25 t.Run("no options", func(t *testing.T) {
26 srv := RunBasicJetStreamServer()
27 defer shutdownJSServerAndRemoveStorage(t, srv)
28 nc, err := nats.Connect(srv.ClientURL())
29 if err != nil {
30 t.Fatalf("Unexpected error: %v", err)
31 }
32
33 js, err := jetstream.New(nc)
34 if err != nil {
35 t.Fatalf("Unexpected error: %v", err)
36 }
37 defer nc.Close()
38
39 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
40 defer cancel()
41 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
42 if err != nil {
43 t.Fatalf("Unexpected error: %v", err)
44 }
45 c, err := s.CreatePushConsumer(ctx, jetstream.ConsumerConfig{
46 DeliverSubject: nats.NewInbox(),
47 AckPolicy: jetstream.AckExplicitPolicy,
48 })
49 if err != nil {
50 t.Fatalf("Unexpected error: %v", err)
51 }
52
53 msgs := make([]jetstream.Msg, 0)
54 wg := &sync.WaitGroup{}
55 wg.Add(len(testMsgs))
56 l, err := c.Consume(func(msg jetstream.Msg) {
57 msg.Ack()
58 msgs = append(msgs, msg)
59 wg.Done()
60 })
61 if err != nil {
62 t.Fatalf("Unexpected error: %v", err)
63 }
64 defer l.Stop()
65
66 publishTestMsgs(t, js)
67 wg.Wait()
68 if len(msgs) != len(testMsgs) {
69 t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
70 }
71 for i, msg := range msgs {

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
ConsumeErrHandlerTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
NewInboxMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
restartBasicJSServerFunction · 0.70
PublishMethod · 0.65
CreateStreamMethod · 0.65
CreatePushConsumerMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected