(b *testing.B)
| 43 | } |
| 44 | |
| 45 | func BenchmarkPubSubSpeed(b *testing.B) { |
| 46 | b.StopTimer() |
| 47 | s := RunDefaultServer() |
| 48 | defer s.Shutdown() |
| 49 | nc := NewDefaultConnection(b) |
| 50 | defer nc.Close() |
| 51 | |
| 52 | ch := make(chan bool) |
| 53 | |
| 54 | nc.SetErrorHandler(func(nc *nats.Conn, s *nats.Subscription, err error) { |
| 55 | b.Fatalf("Error : %v\n", err) |
| 56 | }) |
| 57 | |
| 58 | received := int32(0) |
| 59 | |
| 60 | nc.Subscribe("foo", func(m *nats.Msg) { |
| 61 | if nr := atomic.AddInt32(&received, 1); nr >= int32(b.N) { |
| 62 | ch <- true |
| 63 | } |
| 64 | }) |
| 65 | |
| 66 | msg := []byte("Hello World") |
| 67 | |
| 68 | b.StartTimer() |
| 69 | |
| 70 | for i := 0; i < b.N; i++ { |
| 71 | if err := nc.Publish("foo", msg); err != nil { |
| 72 | b.Fatalf("Error in benchmark during Publish: %v\n", err) |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | // Make sure they are all processed. |
| 77 | err := WaitTime(ch, 10*time.Second) |
| 78 | if err != nil { |
| 79 | b.Fatal("Timed out waiting for messages") |
| 80 | } else if atomic.LoadInt32(&received) != int32(b.N) { |
| 81 | b.Fatalf("Received: %d, err:%v", received, nc.LastError()) |
| 82 | } |
| 83 | b.StopTimer() |
| 84 | } |
| 85 | |
| 86 | func BenchmarkAsyncSubscriptionCreationSpeed(b *testing.B) { |
| 87 | b.StopTimer() |
nothing calls this directly
no test coverage detected