MCPcopy
hub / github.com/nats-io/nats.go / BenchmarkPubSubSpeed

Function BenchmarkPubSubSpeed

test/bench_test.go:45–84  ·  view source on GitHub ↗
(b *testing.B)

Source from the content-addressed store, hash-verified

43}
44
45func BenchmarkPubSubSpeed(b *testing.B) {
46 b.StopTimer()
47 s := RunDefaultServer()
48 defer s.Shutdown()
49 nc := NewDefaultConnection(b)
50 defer nc.Close()
51
52 ch := make(chan bool)
53
54 nc.SetErrorHandler(func(nc *nats.Conn, s *nats.Subscription, err error) {
55 b.Fatalf("Error : %v\n", err)
56 })
57
58 received := int32(0)
59
60 nc.Subscribe("foo", func(m *nats.Msg) {
61 if nr := atomic.AddInt32(&received, 1); nr >= int32(b.N) {
62 ch <- true
63 }
64 })
65
66 msg := []byte("Hello World")
67
68 b.StartTimer()
69
70 for i := 0; i < b.N; i++ {
71 if err := nc.Publish("foo", msg); err != nil {
72 b.Fatalf("Error in benchmark during Publish: %v\n", err)
73 }
74 }
75
76 // Make sure they are all processed.
77 err := WaitTime(ch, 10*time.Second)
78 if err != nil {
79 b.Fatal("Timed out waiting for messages")
80 } else if atomic.LoadInt32(&received) != int32(b.N) {
81 b.Fatalf("Received: %d, err:%v", received, nc.LastError())
82 }
83 b.StopTimer()
84}
85
86func BenchmarkAsyncSubscriptionCreationSpeed(b *testing.B) {
87 b.StopTimer()

Callers

nothing calls this directly

Calls 9

SetErrorHandlerMethod · 0.80
FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
WaitTimeFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
LastErrorMethod · 0.45

Tested by

no test coverage detected