MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncResetPendingOnReconnect

Function TestPublishAsyncResetPendingOnReconnect

test/js_test.go:8206–8259  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8204}
8205
8206func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
8207 s := RunBasicJetStreamServer()
8208 defer shutdownJSServerAndRemoveStorage(t, s)
8209
8210 nc, js := jsClient(t, s)
8211 defer nc.Close()
8212
8213 // Now create a stream and expect a PubAck from <-OK().
8214 if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil {
8215 t.Fatalf("Unexpected error: %v", err)
8216 }
8217
8218 errs := make(chan error, 1)
8219 done := make(chan struct{}, 1)
8220 acks := make(chan nats.PubAckFuture, 100)
8221 go func() {
8222 for i := 0; i < 100; i++ {
8223 if ack, err := js.PublishAsync("FOO", []byte("hello")); err != nil {
8224 errs <- err
8225 return
8226 } else {
8227 acks <- ack
8228 }
8229 }
8230 close(acks)
8231 done <- struct{}{}
8232 }()
8233 select {
8234 case <-done:
8235 case err := <-errs:
8236 t.Fatalf("Unexpected error during publish: %v", err)
8237 case <-time.After(5 * time.Second):
8238 t.Fatalf("Did not receive completion signal")
8239 }
8240 s.Shutdown()
8241 time.Sleep(100 * time.Millisecond)
8242 if pending := js.PublishAsyncPending(); pending != 0 {
8243 t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
8244 }
8245 s = RunBasicJetStreamServer()
8246 defer shutdownJSServerAndRemoveStorage(t, s)
8247
8248 for ack := range acks {
8249 select {
8250 case <-ack.Ok():
8251 case err := <-ack.Err():
8252 if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
8253 t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
8254 }
8255 case <-time.After(5 * time.Second):
8256 t.Fatalf("Did not receive completion signal")
8257 }
8258 }
8259}
8260
8261func TestPublishAsyncAckTimeout(t *testing.T) {
8262 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncPendingMethod · 0.65
OkMethod · 0.65
ErrMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected