(t *testing.T)
| 1592 | } |
| 1593 | |
| 1594 | func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) { |
| 1595 | t.Run("outstanding ack exceed limit", func(t *testing.T) { |
| 1596 | srv := RunBasicJetStreamServer() |
| 1597 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1598 | nc, err := nats.Connect(srv.ClientURL()) |
| 1599 | if err != nil { |
| 1600 | t.Fatalf("Unexpected error: %v", err) |
| 1601 | } |
| 1602 | |
| 1603 | js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(5)) |
| 1604 | if err != nil { |
| 1605 | t.Fatalf("Unexpected error: %v", err) |
| 1606 | } |
| 1607 | defer nc.Close() |
| 1608 | |
| 1609 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1610 | defer cancel() |
| 1611 | |
| 1612 | _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1613 | if err != nil { |
| 1614 | t.Fatalf("Unexpected error: %v", err) |
| 1615 | } |
| 1616 | |
| 1617 | for i := 0; i < 20; i++ { |
| 1618 | _, err = js.PublishAsync("FOO.1", []byte("msg")) |
| 1619 | if err != nil { |
| 1620 | t.Fatalf("Unexpected error: %v", err) |
| 1621 | } |
| 1622 | if numPending := js.PublishAsyncPending(); numPending > 5 { |
| 1623 | t.Fatalf("Expected 5 pending messages, got: %d", numPending) |
| 1624 | } |
| 1625 | } |
| 1626 | }) |
| 1627 | t.Run("too many messages without ack", func(t *testing.T) { |
| 1628 | srv := RunBasicJetStreamServer() |
| 1629 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1630 | nc, err := nats.Connect(srv.ClientURL()) |
| 1631 | if err != nil { |
| 1632 | t.Fatalf("Unexpected error: %v", err) |
| 1633 | } |
| 1634 | |
| 1635 | js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(5)) |
| 1636 | if err != nil { |
| 1637 | t.Fatalf("Unexpected error: %v", err) |
| 1638 | } |
| 1639 | defer nc.Close() |
| 1640 | |
| 1641 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1642 | defer cancel() |
| 1643 | |
| 1644 | _, err = js.CreateStream(ctx, jetstream.StreamConfig{ |
| 1645 | Name: "foo", |
| 1646 | Subjects: []string{"FOO.*"}, |
| 1647 | // disable stream acks |
| 1648 | NoAck: true, |
| 1649 | }) |
| 1650 | if err != nil { |
| 1651 | t.Fatalf("Unexpected error: %v", err) |
nothing calls this directly
no test coverage detected