MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamPullSubscribeFetchErrOnReconnect

Function TestJetStreamPullSubscribeFetchErrOnReconnect

test/js_test.go:11189–11215  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

11187}
11188
11189func TestJetStreamPullSubscribeFetchErrOnReconnect(t *testing.T) {
11190 srv := RunBasicJetStreamServer()
11191 defer shutdownJSServerAndRemoveStorage(t, srv)
11192 nc, js := jsClient(t, srv)
11193 defer nc.Close()
11194 _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
11195 if err != nil {
11196 t.Fatalf("Unexpected error: %v", err)
11197 }
11198 sub, err := js.PullSubscribe("FOO.123", "bar")
11199 if err != nil {
11200 t.Fatalf("Unexpected error: %v", err)
11201 }
11202 defer sub.Unsubscribe()
11203 errs := make(chan error, 1)
11204 go func() {
11205 time.Sleep(100 * time.Millisecond)
11206 errs <- nc.ForceReconnect()
11207 }()
11208 _, err = sub.Fetch(1, nats.MaxWait(time.Second))
11209 if !errors.Is(err, nats.ErrFetchDisconnected) {
11210 t.Fatalf("Expected error: %v; got: %v", nats.ErrFetchDisconnected, err)
11211 }
11212 if err := <-errs; err != nil {
11213 t.Fatalf("Error on reconnect: %v", err)
11214 }
11215}
11216
11217func TestJetStreamPullSubscribeFetchBatchErrOnReconnect(t *testing.T) {
11218 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
ForceReconnectMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PullSubscribeMethod · 0.65
FetchMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected