MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamPublishAsyncPerf

Function TestJetStreamPublishAsyncPerf

test/js_test.go:8439–8495  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8437}
8438
8439func TestJetStreamPublishAsyncPerf(t *testing.T) {
8440 // Comment out below to run this benchmark.
8441 t.SkipNow()
8442
8443 s := RunBasicJetStreamServer()
8444 defer shutdownJSServerAndRemoveStorage(t, s)
8445
8446 nc, err := nats.Connect(s.ClientURL())
8447 if err != nil {
8448 t.Fatalf("Unexpected error: %v", err)
8449 }
8450 defer nc.Close()
8451
8452 // 64 byte payload.
8453 msg := make([]byte, 64)
8454 rand.Read(msg)
8455
8456 // Setup error handler.
8457 var errors uint32
8458 errHandler := func(js nats.JetStream, originalMsg *nats.Msg, err error) {
8459 t.Logf("Got an async err: %v", err)
8460 atomic.AddUint32(&errors, 1)
8461 }
8462
8463 js, err := nc.JetStream(
8464 nats.PublishAsyncErrHandler(errHandler),
8465 nats.PublishAsyncMaxPending(256),
8466 )
8467 if err != nil {
8468 t.Fatalf("Unexpected error: %v", err)
8469 }
8470
8471 if _, err := js.AddStream(&nats.StreamConfig{Name: "B"}); err != nil {
8472 t.Fatalf("Unexpected error: %v", err)
8473 }
8474
8475 toSend := 1000000
8476 start := time.Now()
8477 for i := 0; i < toSend; i++ {
8478 if _, err = js.PublishAsync("B", msg); err != nil {
8479 t.Fatalf("Unexpected error: %v", err)
8480 }
8481 }
8482
8483 select {
8484 case <-js.PublishAsyncComplete():
8485 if ne := atomic.LoadUint32(&errors); ne > 0 {
8486 t.Fatalf("Got unexpected errors publishing")
8487 }
8488 case <-time.After(5 * time.Second):
8489 t.Fatalf("Did not receive completion signal")
8490 }
8491
8492 tt := time.Since(start)
8493 fmt.Printf("Took %v to send %d msgs\n", tt, toSend)
8494 fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
8495}
8496

Callers

nothing calls this directly

Calls 11

ConnectMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
SecondsMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
CloseMethod · 0.45
ReadMethod · 0.45

Tested by

no test coverage detected