MCPcopy
hub / github.com/nats-io/nats.go / TestSlowAsyncSubscriber

Function TestSlowAsyncSubscriber

test/sub_test.go:508–570  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

506}
507
508func TestSlowAsyncSubscriber(t *testing.T) {
509 s := RunDefaultServer()
510 defer s.Shutdown()
511
512 nc := NewDefaultConnection(t)
513 defer nc.Close()
514
515 // Override default handler for test.
516 nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})
517
518 bch := make(chan bool)
519
520 sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
521 // block to back us up..
522 <-bch
523 // Avoid repeated calls that would then block again
524 m.Sub.Unsubscribe()
525 })
526 // Make sure these are the defaults
527 pm, pb, _ := sub.PendingLimits()
528 if pm != nats.DefaultSubPendingMsgsLimit {
529 t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", nats.DefaultSubPendingMsgsLimit, pm)
530 }
531 if pb != nats.DefaultSubPendingBytesLimit {
532 t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", nats.DefaultSubPendingBytesLimit, pb)
533 }
534
535 // Set new limits
536 pml := 100
537 pbl := 1024 * 1024
538
539 sub.SetPendingLimits(pml, pbl)
540
541 // Make sure the set is correct
542 pm, pb, _ = sub.PendingLimits()
543 if pm != pml {
544 t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", pml, pm)
545 }
546 if pb != pbl {
547 t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", pbl, pb)
548 }
549
550 for range int(pml) + 100 {
551 nc.Publish("foo", []byte("Hello"))
552 }
553
554 timeout := 5 * time.Second
555 start := time.Now()
556 err := nc.FlushTimeout(timeout)
557 elapsed := time.Since(start)
558 if elapsed >= timeout {
559 t.Fatalf("Flush did not return before timeout")
560 }
561 // We want flush to work, so expect no error for it.
562 if err != nil {
563 t.Fatalf("Expected no error from Flush()\n")
564 }
565 if nc.LastError() != nats.ErrSlowConsumer {

Callers

nothing calls this directly

Calls 12

SetErrorHandlerMethod · 0.80
UnsubscribeMethod · 0.80
PendingLimitsMethod · 0.80
FatalfMethod · 0.80
SetPendingLimitsMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushTimeoutMethod · 0.45
LastErrorMethod · 0.45

Tested by

no test coverage detected