MCPcopy
hub / github.com/nats-io/nats.go / TestOrderedConsumerConsume

Function TestOrderedConsumerConsume

jetstream/test/ordered_test.go:30–701  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

28)
29
30func TestOrderedConsumerConsume(t *testing.T) {
31 testSubject := "FOO.123"
32 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
33 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
34 for _, msg := range testMsgs {
35 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
36 t.Fatalf("Unexpected error during publish: %s", err)
37 }
38 }
39 }
40 t.Run("base usage, delete consumer", func(t *testing.T) {
41 srv := RunBasicJetStreamServer()
42 defer shutdownJSServerAndRemoveStorage(t, srv)
43 nc, err := nats.Connect(srv.ClientURL())
44 if err != nil {
45 t.Fatalf("Unexpected error: %v", err)
46 }
47
48 js, err := jetstream.New(nc)
49 if err != nil {
50 t.Fatalf("Unexpected error: %v", err)
51 }
52 defer nc.Close()
53
54 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
55 defer cancel()
56 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
57 if err != nil {
58 t.Fatalf("Unexpected error: %v", err)
59 }
60 c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
61 if err != nil {
62 t.Fatalf("Unexpected error: %v", err)
63 }
64
65 msgs := make([]jetstream.Msg, 0)
66 wg := &sync.WaitGroup{}
67 wg.Add(len(testMsgs))
68 l, err := c.Consume(func(msg jetstream.Msg) {
69 msgs = append(msgs, msg)
70 wg.Done()
71 })
72 if err != nil {
73 t.Fatalf("Unexpected error: %v", err)
74 }
75
76 publishTestMsgs(t, js)
77 wg.Wait()
78
79 name := c.CachedInfo().Name
80 if err := s.DeleteConsumer(ctx, name); err != nil {
81 t.Fatal(err)
82 }
83 wg.Add(len(testMsgs))
84 publishTestMsgs(t, js)
85 wg.Wait()
86
87 l.Stop()

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
ConsumeErrHandlerTypeAlias · 0.92
StopAfterTypeAlias · 0.92
PullMaxMessagesTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
restartBasicJSServerFunction · 0.70
PublishMethod · 0.65
CreateStreamMethod · 0.65
OrderedConsumerMethod · 0.65

Tested by

no test coverage detected