(t *testing.T, srvs ...*jsServer)
| 9312 | } |
| 9313 | |
| 9314 | func testJetStreamFetchContext(t *testing.T, srvs ...*jsServer) { |
| 9315 | srv := srvs[0] |
| 9316 | nc, js := jsClient(t, srv.Server) |
| 9317 | defer nc.Close() |
| 9318 | |
| 9319 | var err error |
| 9320 | |
| 9321 | subject := "WQ" |
| 9322 | _, err = js.AddStream(&nats.StreamConfig{ |
| 9323 | Name: subject, |
| 9324 | Replicas: 3, |
| 9325 | }) |
| 9326 | if err != nil { |
| 9327 | t.Fatal(err) |
| 9328 | } |
| 9329 | |
| 9330 | sendMsgs := func(t *testing.T, totalMsgs int) { |
| 9331 | t.Helper() |
| 9332 | for i := 0; i < totalMsgs; i++ { |
| 9333 | payload := fmt.Sprintf("i:%d", i) |
| 9334 | _, err := js.Publish(subject, []byte(payload)) |
| 9335 | if err != nil { |
| 9336 | t.Errorf("Unexpected error: %v", err) |
| 9337 | } |
| 9338 | } |
| 9339 | } |
| 9340 | expected := 10 |
| 9341 | sendMsgs(t, expected) |
| 9342 | |
| 9343 | sub, err := js.PullSubscribe(subject, "batch-ctx") |
| 9344 | if err != nil { |
| 9345 | t.Fatal(err) |
| 9346 | } |
| 9347 | defer sub.Unsubscribe() |
| 9348 | |
| 9349 | t.Run("ctx background", func(t *testing.T) { |
| 9350 | _, err = sub.Fetch(expected, nats.Context(context.Background())) |
| 9351 | if err == nil { |
| 9352 | t.Fatal("Unexpected success") |
| 9353 | } |
| 9354 | if err != nats.ErrNoDeadlineContext { |
| 9355 | t.Errorf("Expected context deadline error, got: %v", err) |
| 9356 | } |
| 9357 | }) |
| 9358 | |
| 9359 | t.Run("ctx canceled", func(t *testing.T) { |
| 9360 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 9361 | cancel() |
| 9362 | |
| 9363 | _, err = sub.Fetch(expected, nats.Context(ctx)) |
| 9364 | if err == nil { |
| 9365 | t.Fatal("Unexpected success") |
| 9366 | } |
| 9367 | if err != context.Canceled { |
| 9368 | t.Errorf("Expected context deadline error, got: %v", err) |
| 9369 | } |
| 9370 | |
| 9371 | ctx, cancel = context.WithCancel(context.Background()) |
nothing calls this directly
no test coverage detected