(t *testing.T)
| 8437 | } |
| 8438 | |
| 8439 | func TestJetStreamPublishAsyncPerf(t *testing.T) { |
| 8440 | // Comment out below to run this benchmark. |
| 8441 | t.SkipNow() |
| 8442 | |
| 8443 | s := RunBasicJetStreamServer() |
| 8444 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8445 | |
| 8446 | nc, err := nats.Connect(s.ClientURL()) |
| 8447 | if err != nil { |
| 8448 | t.Fatalf("Unexpected error: %v", err) |
| 8449 | } |
| 8450 | defer nc.Close() |
| 8451 | |
| 8452 | // 64 byte payload. |
| 8453 | msg := make([]byte, 64) |
| 8454 | rand.Read(msg) |
| 8455 | |
| 8456 | // Setup error handler. |
| 8457 | var errors uint32 |
| 8458 | errHandler := func(js nats.JetStream, originalMsg *nats.Msg, err error) { |
| 8459 | t.Logf("Got an async err: %v", err) |
| 8460 | atomic.AddUint32(&errors, 1) |
| 8461 | } |
| 8462 | |
| 8463 | js, err := nc.JetStream( |
| 8464 | nats.PublishAsyncErrHandler(errHandler), |
| 8465 | nats.PublishAsyncMaxPending(256), |
| 8466 | ) |
| 8467 | if err != nil { |
| 8468 | t.Fatalf("Unexpected error: %v", err) |
| 8469 | } |
| 8470 | |
| 8471 | if _, err := js.AddStream(&nats.StreamConfig{Name: "B"}); err != nil { |
| 8472 | t.Fatalf("Unexpected error: %v", err) |
| 8473 | } |
| 8474 | |
| 8475 | toSend := 1000000 |
| 8476 | start := time.Now() |
| 8477 | for i := 0; i < toSend; i++ { |
| 8478 | if _, err = js.PublishAsync("B", msg); err != nil { |
| 8479 | t.Fatalf("Unexpected error: %v", err) |
| 8480 | } |
| 8481 | } |
| 8482 | |
| 8483 | select { |
| 8484 | case <-js.PublishAsyncComplete(): |
| 8485 | if ne := atomic.LoadUint32(&errors); ne > 0 { |
| 8486 | t.Fatalf("Got unexpected errors publishing") |
| 8487 | } |
| 8488 | case <-time.After(5 * time.Second): |
| 8489 | t.Fatalf("Did not receive completion signal") |
| 8490 | } |
| 8491 | |
| 8492 | tt := time.Since(start) |
| 8493 | fmt.Printf("Took %v to send %d msgs\n", tt, toSend) |
| 8494 | fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) |
| 8495 | } |
| 8496 |
nothing calls this directly
no test coverage detected