MCPcopy
hub / github.com/nats-io/nats.go / TestPullSubscribeConsumerDeleted

Function TestPullSubscribeConsumerDeleted

test/js_test.go:2022–2077  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2020}
2021
2022func TestPullSubscribeConsumerDeleted(t *testing.T) {
2023 s := RunBasicJetStreamServer()
2024 defer shutdownJSServerAndRemoveStorage(t, s)
2025
2026 nc, js := jsClient(t, s)
2027 defer nc.Close()
2028
2029 _, err := js.AddStream(&nats.StreamConfig{
2030 Name: "TEST",
2031 Subjects: []string{"foo"},
2032 })
2033 if err != nil {
2034 t.Fatalf("Unexpected error: %v", err)
2035 }
2036 if _, err := js.Publish("foo", []byte("msg")); err != nil {
2037 t.Fatal(err)
2038 }
2039 t.Run("delete consumer", func(t *testing.T) {
2040 sub, err := js.PullSubscribe("foo", "cons")
2041 if err != nil {
2042 t.Fatal(err)
2043 }
2044 defer sub.Unsubscribe()
2045 if err != nil {
2046 t.Fatal(err)
2047 }
2048
2049 if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
2050 t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
2051 }
2052 time.AfterFunc(50*time.Millisecond, func() { js.DeleteConsumer("TEST", "cons") })
2053 if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
2054 t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
2055 }
2056 })
2057
2058 t.Run("delete stream", func(t *testing.T) {
2059 sub, err := js.PullSubscribe("foo", "cons")
2060 if err != nil {
2061 t.Fatal(err)
2062 }
2063 defer sub.Unsubscribe()
2064 if err != nil {
2065 t.Fatal(err)
2066 }
2067
2068 if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
2069 t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
2070 }
2071 time.AfterFunc(50*time.Millisecond, func() { js.DeleteStream("TEST") })
2072 if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
2073 t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
2074 }
2075 })
2076
2077}
2078
2079func TestJetStreamAckPending_Pull(t *testing.T) {

Callers

nothing calls this directly

Calls 13

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
PullSubscribeMethod · 0.65
FetchMethod · 0.65
DeleteConsumerMethod · 0.65
DeleteStreamMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected