(t *testing.T)
| 1337 | } |
| 1338 | |
| 1339 | func TestPullSubscribeConsumerDoesNotExist(t *testing.T) { |
| 1340 | s := RunBasicJetStreamServer() |
| 1341 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1342 | |
| 1343 | nc, js := jsClient(t, s) |
| 1344 | defer nc.Close() |
| 1345 | |
| 1346 | _, err := js.AddStream(&nats.StreamConfig{ |
| 1347 | Name: "TEST", |
| 1348 | Subjects: []string{"foo"}, |
| 1349 | }) |
| 1350 | if err != nil { |
| 1351 | t.Fatalf("Unexpected error: %v", err) |
| 1352 | } |
| 1353 | |
| 1354 | sub, err := js.PullSubscribe("foo", "") |
| 1355 | if err != nil { |
| 1356 | t.Fatalf("Unexpected error: %s", err) |
| 1357 | } |
| 1358 | defer sub.Unsubscribe() |
| 1359 | |
| 1360 | info, err := sub.ConsumerInfo() |
| 1361 | if err != nil { |
| 1362 | t.Fatalf("Unexpected error: %v", err) |
| 1363 | } |
| 1364 | if err := js.DeleteConsumer("TEST", info.Name); err != nil { |
| 1365 | t.Fatalf("Unexpected error: %v", err) |
| 1366 | } |
| 1367 | _, err = sub.Fetch(5) |
| 1368 | if !errors.Is(err, nats.ErrNoResponders) { |
| 1369 | t.Fatalf("Expected no responders error; got: %v", err) |
| 1370 | } |
| 1371 | |
| 1372 | msgs, err := sub.FetchBatch(5) |
| 1373 | if err != nil { |
| 1374 | t.Fatalf("Unexpected error: %v", err) |
| 1375 | } |
| 1376 | select { |
| 1377 | case _, ok := <-msgs.Messages(): |
| 1378 | if ok { |
| 1379 | t.Fatalf("Expected no messages") |
| 1380 | } |
| 1381 | case <-time.After(time.Second): |
| 1382 | t.Fatalf("Timeout waiting for messages") |
| 1383 | } |
| 1384 | |
| 1385 | if !errors.Is(msgs.Error(), nats.ErrNoResponders) { |
| 1386 | t.Fatalf("Expected no responders error; got: %v", msgs.Error()) |
| 1387 | } |
| 1388 | } |
| 1389 | |
| 1390 | func TestPullSubscribeFetchDrain(t *testing.T) { |
| 1391 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected