MCPcopy
hub / github.com/nats-io/nats.go / TestNoRaceJetStreamConsumerSlowConsumer

Function TestNoRaceJetStreamConsumerSlowConsumer

test/norace_test.go:106–174  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

104}
105
106func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) {
107 // This test fails many times, need to look harder at the imbalance.
108 t.SkipNow()
109
110 s := RunServerOnPort(-1)
111 defer shutdownJSServerAndRemoveStorage(t, s)
112
113 if err := s.EnableJetStream(nil); err != nil {
114 t.Fatalf("Expected no error, got %v", err)
115 }
116
117 nc, js := jsClient(t, s)
118 defer nc.Close()
119
120 var err error
121
122 _, err = js.AddStream(&nats.StreamConfig{
123 Name: "PENDING_TEST",
124 Subjects: []string{"js.p"},
125 Storage: nats.MemoryStorage,
126 })
127 if err != nil {
128 t.Fatalf("stream create failed: %v", err)
129 }
130
131 // Override default handler for test.
132 nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})
133
134 // Queue up 1M small messages.
135 toSend := uint64(1000000)
136 for i := uint64(0); i < toSend; i++ {
137 nc.Publish("js.p", []byte("ok"))
138 }
139 nc.Flush()
140
141 str, err := js.StreamInfo("PENDING_TEST")
142 if err != nil {
143 t.Fatal(err)
144 }
145
146 if nm := str.State.Msgs; nm != toSend {
147 t.Fatalf("Expected to have stored all %d msgs, got only %d", toSend, nm)
148 }
149
150 var received uint64
151 done := make(chan bool, 1)
152
153 js.Subscribe("js.p", func(m *nats.Msg) {
154 received++
155 if received >= toSend {
156 done <- true
157 }
158 meta, err := m.Metadata()
159 if err != nil {
160 t.Fatalf("could not get message metadata: %s", err)
161 }
162 if meta.Sequence.Stream != received {
163 t.Errorf("Missed a sequence, was expecting %d but got %d, last error: '%v'", received, meta.Sequence.Stream, nc.LastError())

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
SetErrorHandlerMethod · 0.80
ErrorfMethod · 0.80
RunServerOnPortFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
StreamInfoMethod · 0.65
SubscribeMethod · 0.65
MetadataMethod · 0.65
AckMethod · 0.65

Tested by

no test coverage detected