(t *testing.T)
| 11215 | } |
| 11216 | |
| 11217 | func TestJetStreamPullSubscribeFetchBatchErrOnReconnect(t *testing.T) { |
| 11218 | srv := RunBasicJetStreamServer() |
| 11219 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 11220 | nc, js := jsClient(t, srv) |
| 11221 | defer nc.Close() |
| 11222 | _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 11223 | if err != nil { |
| 11224 | t.Fatalf("Unexpected error: %v", err) |
| 11225 | } |
| 11226 | sub, err := js.PullSubscribe("FOO.123", "bar") |
| 11227 | if err != nil { |
| 11228 | t.Fatalf("Unexpected error: %v", err) |
| 11229 | } |
| 11230 | defer sub.Unsubscribe() |
| 11231 | errs := make(chan error, 1) |
| 11232 | go func() { |
| 11233 | time.Sleep(100 * time.Millisecond) |
| 11234 | errs <- nc.ForceReconnect() |
| 11235 | }() |
| 11236 | msgs, err := sub.FetchBatch(1, nats.MaxWait(time.Second), nats.PullHeartbeat(100*time.Millisecond)) |
| 11237 | if err != nil { |
| 11238 | t.Fatalf("Unexpected error: %v", err) |
| 11239 | } |
| 11240 | for range msgs.Messages() { |
| 11241 | t.Fatalf("Expected no messages, got one") |
| 11242 | } |
| 11243 | if !errors.Is(msgs.Error(), nats.ErrFetchDisconnected) { |
| 11244 | t.Fatalf("Expected error: %v; got: %v", nats.ErrFetchDisconnected, msgs.Error()) |
| 11245 | } |
| 11246 | if err := <-errs; err != nil { |
| 11247 | t.Fatalf("Error on reconnect: %v", err) |
| 11248 | } |
| 11249 | } |
| 11250 | |
| 11251 | func TestJetStreamSubscribeShortTimeoutWithContext(t *testing.T) { |
| 11252 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected