(t *testing.T)
| 8322 | } |
| 8323 | |
| 8324 | func TestPublishAsyncClearStall(t *testing.T) { |
| 8325 | s := RunBasicJetStreamServer() |
| 8326 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8327 | |
| 8328 | nc, err := nats.Connect(s.ClientURL()) |
| 8329 | if err != nil { |
| 8330 | t.Fatalf("Unexpected error: %v", err) |
| 8331 | } |
| 8332 | |
| 8333 | js, err := nc.JetStream( |
| 8334 | nats.PublishAsyncTimeout(500*time.Millisecond), |
| 8335 | nats.PublishAsyncMaxPending(100)) |
| 8336 | if err != nil { |
| 8337 | t.Fatalf("Unexpected error: %v", err) |
| 8338 | } |
| 8339 | defer nc.Close() |
| 8340 | |
| 8341 | // use stream with no acks to test stalling |
| 8342 | _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true}) |
| 8343 | if err != nil { |
| 8344 | t.Fatalf("Unexpected error: %v", err) |
| 8345 | } |
| 8346 | |
| 8347 | for range 100 { |
| 8348 | _, err := js.PublishAsync("FOO.A", []byte("hello"), nats.StallWait(1*time.Nanosecond)) |
| 8349 | if err != nil { |
| 8350 | t.Fatalf("Unexpected error: %v", err) |
| 8351 | } |
| 8352 | } |
| 8353 | // after publishing 100 messages, next one should fail with ErrTooManyStalledMsgs |
| 8354 | _, err = js.PublishAsync("FOO.A", []byte("hello"), nats.StallWait(50*time.Millisecond)) |
| 8355 | if !errors.Is(err, nats.ErrTooManyStalledMsgs) { |
| 8356 | t.Fatalf("Expected error: %v; got: %v", nats.ErrTooManyStalledMsgs, err) |
| 8357 | } |
| 8358 | |
| 8359 | // after publish timeout all pending messages should be cleared |
| 8360 | // and we should be able to publish again |
| 8361 | select { |
| 8362 | case <-js.PublishAsyncComplete(): |
| 8363 | case <-time.After(2 * time.Second): |
| 8364 | t.Fatalf("Did not receive completion signal") |
| 8365 | } |
| 8366 | |
| 8367 | if _, err = js.PublishAsync("FOO.A", []byte("hello")); err != nil { |
| 8368 | t.Fatalf("Unexpected error: %v", err) |
| 8369 | } |
| 8370 | if js.PublishAsyncPending() != 1 { |
| 8371 | t.Fatalf("Expected 1 pending message; got: %d", js.PublishAsyncPending()) |
| 8372 | } |
| 8373 | } |
| 8374 | |
| 8375 | func TestPublishAsyncRetryInErrHandler(t *testing.T) { |
| 8376 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected