MCPcopy
hub / github.com/nats-io/nats.go / TestDrainSlowSubscriber

Function TestDrainSlowSubscriber

test/drain_test.go:196–237  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

194}
195
196func TestDrainSlowSubscriber(t *testing.T) {
197 s := RunDefaultServer()
198 defer s.Shutdown()
199 nc := NewDefaultConnection(t)
200 defer nc.Close()
201
202 received := int32(0)
203
204 sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
205 atomic.AddInt32(&received, 1)
206 time.Sleep(100 * time.Millisecond)
207 })
208 if err != nil {
209 t.Fatalf("Error creating subscription; %v", err)
210 }
211
212 total := 10
213 for i := 0; i < total; i++ {
214 nc.Publish("foo", []byte("Slow Slow"))
215 }
216 nc.Flush()
217
218 pmsgs, _, _ := sub.Pending()
219 if pmsgs != total && pmsgs != total-1 {
220 t.Fatalf("Expected most messages to be pending, but got %d vs %d", pmsgs, total)
221 }
222 sub.Drain()
223
224 // Should take a second or so to drain away.
225 waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
226 // Wait for it to become invalid. Once drained it is unsubscribed.
227 _, _, err := sub.Pending()
228 if err != nats.ErrBadSubscription {
229 return errors.New("Still valid")
230 }
231 r := int(atomic.LoadInt32(&received))
232 if r != total {
233 t.Fatalf("Did not receive all messages, got %d vs %d", r, total)
234 }
235 return nil
236 })
237}
238
239func TestDrainConnection(t *testing.T) {
240 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 10

waitForFunction · 0.85
FatalfMethod · 0.80
PendingMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
DrainMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected