MCPcopy
hub / github.com/nats-io/nats.go / TestQueueSubscriber

Function TestQueueSubscriber

test/basic_test.go:534–571  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

532}
533
534func TestQueueSubscriber(t *testing.T) {
535 s := RunDefaultServer()
536 defer s.Shutdown()
537 nc := NewDefaultConnection(t)
538 defer nc.Close()
539
540 s1, _ := nc.QueueSubscribeSync("foo", "bar")
541 s2, _ := nc.QueueSubscribeSync("foo", "bar")
542 omsg := []byte("Hello World")
543 nc.Publish("foo", omsg)
544 nc.Flush()
545 r1, _, _ := s1.Pending()
546 r2, _, _ := s2.Pending()
547 if (r1 + r2) != 1 {
548 t.Fatal("Received too many messages for multiple queue subscribers")
549 }
550 // Drain messages
551 s1.NextMsg(time.Second)
552 s2.NextMsg(time.Second)
553
554 total := 1000
555 for i := 0; i < total; i++ {
556 nc.Publish("foo", omsg)
557 }
558 nc.Flush()
559 v := uint(float32(total) * 0.15)
560 r1, _, _ = s1.Pending()
561 r2, _, _ = s2.Pending()
562 if r1+r2 != total {
563 t.Fatalf("Incorrect number of messages: %d vs %d", (r1 + r2), total)
564 }
565 expected := total / 2
566 d1 := uint(math.Abs(float64(expected - r1)))
567 d2 := uint(math.Abs(float64(expected - r2)))
568 if d1 > v || d2 > v {
569 t.Fatalf("Too much variance in totals: %d, %d > %d", d1, d2, v)
570 }
571}
572
573func TestReplyArg(t *testing.T) {
574 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 9

PendingMethod · 0.80
NextMsgMethod · 0.80
FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
QueueSubscribeSyncMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected