(t *testing.T)
| 8495 | } |
| 8496 | |
| 8497 | func TestPublishAsyncRetry(t *testing.T) { |
| 8498 | tests := []struct { |
| 8499 | name string |
| 8500 | pubOpts []nats.PubOpt |
| 8501 | ackError error |
| 8502 | pubErr error |
| 8503 | }{ |
| 8504 | { |
| 8505 | name: "retry until stream is ready", |
| 8506 | pubOpts: []nats.PubOpt{ |
| 8507 | nats.RetryAttempts(10), |
| 8508 | nats.RetryWait(100 * time.Millisecond), |
| 8509 | }, |
| 8510 | }, |
| 8511 | { |
| 8512 | name: "fail after max retries", |
| 8513 | pubOpts: []nats.PubOpt{ |
| 8514 | nats.RetryAttempts(2), |
| 8515 | nats.RetryWait(50 * time.Millisecond), |
| 8516 | }, |
| 8517 | ackError: nats.ErrNoResponders, |
| 8518 | }, |
| 8519 | { |
| 8520 | name: "no retries", |
| 8521 | pubOpts: nil, |
| 8522 | ackError: nats.ErrNoResponders, |
| 8523 | }, |
| 8524 | { |
| 8525 | name: "invalid retry attempts", |
| 8526 | pubOpts: []nats.PubOpt{ |
| 8527 | nats.RetryAttempts(-1), |
| 8528 | }, |
| 8529 | pubErr: nats.ErrInvalidArg, |
| 8530 | }, |
| 8531 | } |
| 8532 | |
| 8533 | for _, test := range tests { |
| 8534 | t.Run(test.name, func(t *testing.T) { |
| 8535 | s := RunBasicJetStreamServer() |
| 8536 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8537 | |
| 8538 | nc, err := nats.Connect(s.ClientURL()) |
| 8539 | if err != nil { |
| 8540 | t.Fatalf("Unexpected error: %v", err) |
| 8541 | } |
| 8542 | |
| 8543 | // set max pending to 1 so that we can test if retries don't cause stall |
| 8544 | js, err := nc.JetStream(nats.PublishAsyncMaxPending(1)) |
| 8545 | if err != nil { |
| 8546 | t.Fatalf("Unexpected error: %v", err) |
| 8547 | } |
| 8548 | defer nc.Close() |
| 8549 | |
| 8550 | test.pubOpts = append(test.pubOpts, nats.StallWait(1*time.Nanosecond)) |
| 8551 | ack, err := js.PublishAsync("foo", []byte("hello"), test.pubOpts...) |
| 8552 | if !errors.Is(err, test.pubErr) { |
| 8553 | t.Fatalf("Expected error: %v; got: %v", test.pubErr, err) |
| 8554 | } |
nothing calls this directly
no test coverage detected