MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncClearStall

Function TestPublishAsyncClearStall

test/js_test.go:8324–8373  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8322}
8323
8324func TestPublishAsyncClearStall(t *testing.T) {
8325 s := RunBasicJetStreamServer()
8326 defer shutdownJSServerAndRemoveStorage(t, s)
8327
8328 nc, err := nats.Connect(s.ClientURL())
8329 if err != nil {
8330 t.Fatalf("Unexpected error: %v", err)
8331 }
8332
8333 js, err := nc.JetStream(
8334 nats.PublishAsyncTimeout(500*time.Millisecond),
8335 nats.PublishAsyncMaxPending(100))
8336 if err != nil {
8337 t.Fatalf("Unexpected error: %v", err)
8338 }
8339 defer nc.Close()
8340
8341 // use stream with no acks to test stalling
8342 _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true})
8343 if err != nil {
8344 t.Fatalf("Unexpected error: %v", err)
8345 }
8346
8347 for range 100 {
8348 _, err := js.PublishAsync("FOO.A", []byte("hello"), nats.StallWait(1*time.Nanosecond))
8349 if err != nil {
8350 t.Fatalf("Unexpected error: %v", err)
8351 }
8352 }
8353 // after publishing 100 messages, next one should fail with ErrTooManyStalledMsgs
8354 _, err = js.PublishAsync("FOO.A", []byte("hello"), nats.StallWait(50*time.Millisecond))
8355 if !errors.Is(err, nats.ErrTooManyStalledMsgs) {
8356 t.Fatalf("Expected error: %v; got: %v", nats.ErrTooManyStalledMsgs, err)
8357 }
8358
8359 // after publish timeout all pending messages should be cleared
8360 // and we should be able to publish again
8361 select {
8362 case <-js.PublishAsyncComplete():
8363 case <-time.After(2 * time.Second):
8364 t.Fatalf("Did not receive completion signal")
8365 }
8366
8367 if _, err = js.PublishAsync("FOO.A", []byte("hello")); err != nil {
8368 t.Fatalf("Unexpected error: %v", err)
8369 }
8370 if js.PublishAsyncPending() != 1 {
8371 t.Fatalf("Expected 1 pending message; got: %d", js.PublishAsyncPending())
8372 }
8373}
8374
8375func TestPublishAsyncRetryInErrHandler(t *testing.T) {
8376 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

ConnectMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
PublishAsyncPendingMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected