(t *testing.T)
| 8204 | } |
| 8205 | |
| 8206 | func TestPublishAsyncResetPendingOnReconnect(t *testing.T) { |
| 8207 | s := RunBasicJetStreamServer() |
| 8208 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8209 | |
| 8210 | nc, js := jsClient(t, s) |
| 8211 | defer nc.Close() |
| 8212 | |
| 8213 | // Now create a stream and expect a PubAck from <-OK(). |
| 8214 | if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil { |
| 8215 | t.Fatalf("Unexpected error: %v", err) |
| 8216 | } |
| 8217 | |
| 8218 | errs := make(chan error, 1) |
| 8219 | done := make(chan struct{}, 1) |
| 8220 | acks := make(chan nats.PubAckFuture, 100) |
| 8221 | go func() { |
| 8222 | for i := 0; i < 100; i++ { |
| 8223 | if ack, err := js.PublishAsync("FOO", []byte("hello")); err != nil { |
| 8224 | errs <- err |
| 8225 | return |
| 8226 | } else { |
| 8227 | acks <- ack |
| 8228 | } |
| 8229 | } |
| 8230 | close(acks) |
| 8231 | done <- struct{}{} |
| 8232 | }() |
| 8233 | select { |
| 8234 | case <-done: |
| 8235 | case err := <-errs: |
| 8236 | t.Fatalf("Unexpected error during publish: %v", err) |
| 8237 | case <-time.After(5 * time.Second): |
| 8238 | t.Fatalf("Did not receive completion signal") |
| 8239 | } |
| 8240 | s.Shutdown() |
| 8241 | time.Sleep(100 * time.Millisecond) |
| 8242 | if pending := js.PublishAsyncPending(); pending != 0 { |
| 8243 | t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending) |
| 8244 | } |
| 8245 | s = RunBasicJetStreamServer() |
| 8246 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8247 | |
| 8248 | for ack := range acks { |
| 8249 | select { |
| 8250 | case <-ack.Ok(): |
| 8251 | case err := <-ack.Err(): |
| 8252 | if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) { |
| 8253 | t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err) |
| 8254 | } |
| 8255 | case <-time.After(5 * time.Second): |
| 8256 | t.Fatalf("Did not receive completion signal") |
| 8257 | } |
| 8258 | } |
| 8259 | } |
| 8260 | |
| 8261 | func TestPublishAsyncAckTimeout(t *testing.T) { |
| 8262 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected