MCPcopy
hub / github.com/nats-io/nats.go / TestJetStream_Consumer

Function TestJetStream_Consumer

jetstream/test/jetstream_test.go:1654–1768  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1652}
1653
1654func TestJetStream_Consumer(t *testing.T) {
1655 tests := []struct {
1656 name string
1657 stream string
1658 durable string
1659 timeout time.Duration
1660 withError error
1661 }{
1662 {
1663 name: "get existing consumer",
1664 stream: "foo",
1665 durable: "dur",
1666 timeout: 10 * time.Second,
1667 },
1668 {
1669 name: "with empty context",
1670 stream: "foo",
1671 durable: "dur",
1672 },
1673 {
1674 name: "consumer does not exist",
1675 stream: "foo",
1676 durable: "abc",
1677 timeout: 10 * time.Second,
1678 withError: jetstream.ErrConsumerNotFound,
1679 },
1680 {
1681 name: "invalid durable name",
1682 stream: "foo",
1683 durable: "dur.123",
1684 withError: jetstream.ErrInvalidConsumerName,
1685 },
1686 {
1687 name: "stream does not exist",
1688 stream: "abc",
1689 durable: "dur",
1690 withError: jetstream.ErrStreamNotFound,
1691 },
1692 {
1693 name: "invalid stream name",
1694 stream: "foo.1",
1695 durable: "dur",
1696 withError: jetstream.ErrInvalidStreamName,
1697 },
1698 {
1699 name: "context timeout",
1700 stream: "foo",
1701 durable: "dur",
1702 timeout: 1 * time.Microsecond,
1703 withError: context.DeadlineExceeded,
1704 },
1705 }
1706
1707 srv := RunBasicJetStreamServer()
1708 defer shutdownJSServerAndRemoveStorage(t, srv)
1709 nc, err := nats.Connect(srv.ClientURL())
1710 if err != nil {
1711 t.Fatalf("Unexpected error: %v", err)

Callers

nothing calls this directly

Calls 13

CachedInfoMethod · 0.95
NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
ConsumerMethod · 0.65
PushConsumerMethod · 0.65
DeleteStreamMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected