MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamPullSubscribeFetchBatchErrOnReconnect

Function TestJetStreamPullSubscribeFetchBatchErrOnReconnect

test/js_test.go:11217–11249  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

11215}
11216
11217func TestJetStreamPullSubscribeFetchBatchErrOnReconnect(t *testing.T) {
11218 srv := RunBasicJetStreamServer()
11219 defer shutdownJSServerAndRemoveStorage(t, srv)
11220 nc, js := jsClient(t, srv)
11221 defer nc.Close()
11222 _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
11223 if err != nil {
11224 t.Fatalf("Unexpected error: %v", err)
11225 }
11226 sub, err := js.PullSubscribe("FOO.123", "bar")
11227 if err != nil {
11228 t.Fatalf("Unexpected error: %v", err)
11229 }
11230 defer sub.Unsubscribe()
11231 errs := make(chan error, 1)
11232 go func() {
11233 time.Sleep(100 * time.Millisecond)
11234 errs <- nc.ForceReconnect()
11235 }()
11236 msgs, err := sub.FetchBatch(1, nats.MaxWait(time.Second), nats.PullHeartbeat(100*time.Millisecond))
11237 if err != nil {
11238 t.Fatalf("Unexpected error: %v", err)
11239 }
11240 for range msgs.Messages() {
11241 t.Fatalf("Expected no messages, got one")
11242 }
11243 if !errors.Is(msgs.Error(), nats.ErrFetchDisconnected) {
11244 t.Fatalf("Expected error: %v; got: %v", nats.ErrFetchDisconnected, msgs.Error())
11245 }
11246 if err := <-errs; err != nil {
11247 t.Fatalf("Error on reconnect: %v", err)
11248 }
11249}
11250
11251func TestJetStreamSubscribeShortTimeoutWithContext(t *testing.T) {
11252 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 13

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
ForceReconnectMethod · 0.80
FetchBatchMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PullSubscribeMethod · 0.65
MessagesMethod · 0.65
ErrorMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected