(t *testing.T)
| 2020 | } |
| 2021 | |
| 2022 | func TestPullSubscribeConsumerDeleted(t *testing.T) { |
| 2023 | s := RunBasicJetStreamServer() |
| 2024 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 2025 | |
| 2026 | nc, js := jsClient(t, s) |
| 2027 | defer nc.Close() |
| 2028 | |
| 2029 | _, err := js.AddStream(&nats.StreamConfig{ |
| 2030 | Name: "TEST", |
| 2031 | Subjects: []string{"foo"}, |
| 2032 | }) |
| 2033 | if err != nil { |
| 2034 | t.Fatalf("Unexpected error: %v", err) |
| 2035 | } |
| 2036 | if _, err := js.Publish("foo", []byte("msg")); err != nil { |
| 2037 | t.Fatal(err) |
| 2038 | } |
| 2039 | t.Run("delete consumer", func(t *testing.T) { |
| 2040 | sub, err := js.PullSubscribe("foo", "cons") |
| 2041 | if err != nil { |
| 2042 | t.Fatal(err) |
| 2043 | } |
| 2044 | defer sub.Unsubscribe() |
| 2045 | if err != nil { |
| 2046 | t.Fatal(err) |
| 2047 | } |
| 2048 | |
| 2049 | if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil { |
| 2050 | t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err) |
| 2051 | } |
| 2052 | time.AfterFunc(50*time.Millisecond, func() { js.DeleteConsumer("TEST", "cons") }) |
| 2053 | if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) { |
| 2054 | t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err) |
| 2055 | } |
| 2056 | }) |
| 2057 | |
| 2058 | t.Run("delete stream", func(t *testing.T) { |
| 2059 | sub, err := js.PullSubscribe("foo", "cons") |
| 2060 | if err != nil { |
| 2061 | t.Fatal(err) |
| 2062 | } |
| 2063 | defer sub.Unsubscribe() |
| 2064 | if err != nil { |
| 2065 | t.Fatal(err) |
| 2066 | } |
| 2067 | |
| 2068 | if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil { |
| 2069 | t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err) |
| 2070 | } |
| 2071 | time.AfterFunc(50*time.Millisecond, func() { js.DeleteStream("TEST") }) |
| 2072 | if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) { |
| 2073 | t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err) |
| 2074 | } |
| 2075 | }) |
| 2076 | |
| 2077 | } |
| 2078 | |
| 2079 | func TestJetStreamAckPending_Pull(t *testing.T) { |
nothing calls this directly
no test coverage detected