MCPcopy
hub / github.com/nats-io/nats.go / TestPublishMsgAsyncWithPendingMsgs

Function TestPublishMsgAsyncWithPendingMsgs

jetstream/test/publish_test.go:1594–1701  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1592}
1593
1594func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
1595 t.Run("outstanding ack exceed limit", func(t *testing.T) {
1596 srv := RunBasicJetStreamServer()
1597 defer shutdownJSServerAndRemoveStorage(t, srv)
1598 nc, err := nats.Connect(srv.ClientURL())
1599 if err != nil {
1600 t.Fatalf("Unexpected error: %v", err)
1601 }
1602
1603 js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(5))
1604 if err != nil {
1605 t.Fatalf("Unexpected error: %v", err)
1606 }
1607 defer nc.Close()
1608
1609 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1610 defer cancel()
1611
1612 _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1613 if err != nil {
1614 t.Fatalf("Unexpected error: %v", err)
1615 }
1616
1617 for i := 0; i < 20; i++ {
1618 _, err = js.PublishAsync("FOO.1", []byte("msg"))
1619 if err != nil {
1620 t.Fatalf("Unexpected error: %v", err)
1621 }
1622 if numPending := js.PublishAsyncPending(); numPending > 5 {
1623 t.Fatalf("Expected 5 pending messages, got: %d", numPending)
1624 }
1625 }
1626 })
1627 t.Run("too many messages without ack", func(t *testing.T) {
1628 srv := RunBasicJetStreamServer()
1629 defer shutdownJSServerAndRemoveStorage(t, srv)
1630 nc, err := nats.Connect(srv.ClientURL())
1631 if err != nil {
1632 t.Fatalf("Unexpected error: %v", err)
1633 }
1634
1635 js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(5))
1636 if err != nil {
1637 t.Fatalf("Unexpected error: %v", err)
1638 }
1639 defer nc.Close()
1640
1641 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1642 defer cancel()
1643
1644 _, err = js.CreateStream(ctx, jetstream.StreamConfig{
1645 Name: "foo",
1646 Subjects: []string{"FOO.*"},
1647 // disable stream acks
1648 NoAck: true,
1649 })
1650 if err != nil {
1651 t.Fatalf("Unexpected error: %v", err)

Callers

nothing calls this directly

Calls 14

NewFunction · 0.92
WithStallWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
restartBasicJSServerFunction · 0.70
CreateStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncPendingMethod · 0.65
PublishAsyncCompleteMethod · 0.65

Tested by

no test coverage detected