MCPcopy
hub / github.com/nats-io/nats.go / TestPullSubscribeFetchBatchWithHeartbeat

Function TestPullSubscribeFetchBatchWithHeartbeat

test/js_test.go:1448–1587  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1446}
1447
1448func TestPullSubscribeFetchBatchWithHeartbeat(t *testing.T) {
1449 t.Skip("Since v2.10.26 server sends no responders if the consumer is deleted, we need to figure out how else to test missing heartbeats")
1450 s := RunBasicJetStreamServer()
1451 defer shutdownJSServerAndRemoveStorage(t, s)
1452
1453 nc, js := jsClient(t, s)
1454 defer nc.Close()
1455
1456 _, err := js.AddStream(&nats.StreamConfig{
1457 Name: "TEST",
1458 Subjects: []string{"foo"},
1459 })
1460 if err != nil {
1461 t.Fatalf("Unexpected error: %v", err)
1462 }
1463
1464 sub, err := js.PullSubscribe("foo", "")
1465 if err != nil {
1466 t.Fatalf("Unexpected error: %s", err)
1467 }
1468 defer sub.Unsubscribe()
1469 for i := 0; i < 5; i++ {
1470 if _, err := js.Publish("foo", []byte("msg")); err != nil {
1471 t.Fatalf("Unexpected error: %s", err)
1472 }
1473 }
1474
1475 // fetch 5 messages, should finish immediately
1476 msgs, err := sub.FetchBatch(5, nats.PullHeartbeat(100*time.Millisecond))
1477 if err != nil {
1478 t.Fatalf("Unexpected error: %s", err)
1479 }
1480 var i int
1481 for msg := range msgs.Messages() {
1482 i++
1483 msg.Ack()
1484 }
1485 if i != 5 {
1486 t.Fatalf("Expected %d messages; got: %d", 5, i)
1487 }
1488 if msgs.Error() != nil {
1489 t.Fatalf("Unexpected error: %s", msgs.Error())
1490 }
1491 now := time.Now()
1492 // no messages available, should time out normally
1493 msgs, err = sub.FetchBatch(5, nats.PullHeartbeat(50*time.Millisecond), nats.MaxWait(300*time.Millisecond))
1494 if err != nil {
1495 t.Fatalf("Unexpected error: %s", err)
1496 }
1497 i = 0
1498 for msg := range msgs.Messages() {
1499 i++
1500 msg.Ack()
1501 }
1502 elapsed := time.Since(now)
1503 if i != 0 {
1504 t.Fatalf("Expected %d messages; got: %d", 0, i)
1505 }

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
FetchBatchMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PullSubscribeMethod · 0.65
PublishMethod · 0.65
MessagesMethod · 0.65
AckMethod · 0.65

Tested by

no test coverage detected