MCPcopy
hub / github.com/nats-io/nats.go / TestSlowSubscriber

Function TestSlowSubscriber

test/sub_test.go:452–480  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

450}
451
452func TestSlowSubscriber(t *testing.T) {
453 s := RunDefaultServer()
454 defer s.Shutdown()
455
456 nc := NewDefaultConnection(t)
457 defer nc.Close()
458
459 // Override default handler for test.
460 nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})
461
462 sub, _ := nc.SubscribeSync("foo")
463 sub.SetPendingLimits(100, 1024)
464
465 for range 200 {
466 nc.Publish("foo", []byte("Hello"))
467 }
468 timeout := 5 * time.Second
469 start := time.Now()
470 nc.FlushTimeout(timeout)
471 elapsed := time.Since(start)
472 if elapsed >= timeout {
473 t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout)
474 }
475 // Make sure NextMsg returns an error to indicate slow consumer
476 _, err := sub.NextMsg(200 * time.Millisecond)
477 if err == nil {
478 t.Fatalf("NextMsg did not return an error")
479 }
480}
481
482func TestSlowChanSubscriber(t *testing.T) {
483 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 10

SetErrorHandlerMethod · 0.80
SetPendingLimitsMethod · 0.80
FatalfMethod · 0.80
NextMsgMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeSyncMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushTimeoutMethod · 0.45

Tested by

no test coverage detected