(t *testing.T)
| 1388 | } |
| 1389 | |
| 1390 | func TestPullSubscribeFetchDrain(t *testing.T) { |
| 1391 | s := RunBasicJetStreamServer() |
| 1392 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1393 | |
| 1394 | nc, js := jsClient(t, s) |
| 1395 | defer nc.Close() |
| 1396 | |
| 1397 | _, err := js.AddStream(&nats.StreamConfig{ |
| 1398 | Name: "TEST", |
| 1399 | Subjects: []string{"foo"}, |
| 1400 | }) |
| 1401 | if err != nil { |
| 1402 | t.Fatalf("Unexpected error: %v", err) |
| 1403 | } |
| 1404 | |
| 1405 | defer js.PurgeStream("TEST") |
| 1406 | sub, err := js.PullSubscribe("foo", "") |
| 1407 | if err != nil { |
| 1408 | t.Fatalf("Unexpected error: %s", err) |
| 1409 | } |
| 1410 | for i := 0; i < 100; i++ { |
| 1411 | if _, err := js.Publish("foo", []byte("msg")); err != nil { |
| 1412 | t.Fatalf("Unexpected error: %s", err) |
| 1413 | } |
| 1414 | } |
| 1415 | // fill buffer with messages |
| 1416 | cinfo, err := sub.ConsumerInfo() |
| 1417 | if err != nil { |
| 1418 | t.Fatalf("Unexpected error: %s", err) |
| 1419 | } |
| 1420 | nextSubject := fmt.Sprintf("$JS.API.CONSUMER.MSG.NEXT.TEST.%s", cinfo.Name) |
| 1421 | replySubject := strings.Replace(sub.Subject, "*", "abc", 1) |
| 1422 | payload := `{"batch":10,"no_wait":true}` |
| 1423 | if err := nc.PublishRequest(nextSubject, replySubject, []byte(payload)); err != nil { |
| 1424 | t.Fatalf("Unexpected error: %s", err) |
| 1425 | } |
| 1426 | time.Sleep(100 * time.Millisecond) |
| 1427 | |
| 1428 | // now drain the subscription, messages should be in the buffer |
| 1429 | sub.Drain() |
| 1430 | msgs, err := sub.Fetch(100) |
| 1431 | if err != nil { |
| 1432 | t.Fatalf("Unexpected error: %s", err) |
| 1433 | } |
| 1434 | for _, msg := range msgs { |
| 1435 | msg.Ack() |
| 1436 | } |
| 1437 | if len(msgs) != 10 { |
| 1438 | t.Fatalf("Expected %d messages; got: %d", 10, len(msgs)) |
| 1439 | } |
| 1440 | |
| 1441 | // subsequent fetch should return error, subscription is already drained |
| 1442 | _, err = sub.Fetch(10, nats.MaxWait(100*time.Millisecond)) |
| 1443 | if !errors.Is(err, nats.ErrSubscriptionClosed) { |
| 1444 | t.Fatalf("Expected error: %s; got: %s", nats.ErrSubscriptionClosed, err) |
| 1445 | } |
| 1446 | } |
| 1447 |
nothing calls this directly
no test coverage detected