(t *testing.T)
| 2129 | } |
| 2130 | |
| 2131 | func TestJetStreamAckPending_Push(t *testing.T) { |
| 2132 | s := RunBasicJetStreamServer() |
| 2133 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 2134 | |
| 2135 | nc, js := jsClient(t, s) |
| 2136 | defer nc.Close() |
| 2137 | |
| 2138 | var err error |
| 2139 | |
| 2140 | _, err = js.AddStream(&nats.StreamConfig{ |
| 2141 | Name: "TEST", |
| 2142 | Subjects: []string{"foo"}, |
| 2143 | }) |
| 2144 | if err != nil { |
| 2145 | t.Fatalf("Unexpected error: %v", err) |
| 2146 | } |
| 2147 | |
| 2148 | const totalMsgs = 3 |
| 2149 | for i := 0; i < totalMsgs; i++ { |
| 2150 | if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { |
| 2151 | t.Fatal(err) |
| 2152 | } |
| 2153 | } |
| 2154 | |
| 2155 | sub, err := js.SubscribeSync("foo", |
| 2156 | nats.Durable("dname-wait"), |
| 2157 | nats.AckWait(100*time.Millisecond), |
| 2158 | nats.MaxDeliver(5), |
| 2159 | nats.MaxAckPending(3), |
| 2160 | ) |
| 2161 | if err != nil { |
| 2162 | t.Fatal(err) |
| 2163 | } |
| 2164 | defer sub.Unsubscribe() |
| 2165 | |
| 2166 | // 3 messages delivered 5 times. |
| 2167 | expected := 15 |
| 2168 | timeout := time.Now().Add(2 * time.Second) |
| 2169 | pending := 0 |
| 2170 | for time.Now().Before(timeout) { |
| 2171 | if pending, _, _ = sub.Pending(); pending >= expected { |
| 2172 | break |
| 2173 | } |
| 2174 | time.Sleep(10 * time.Millisecond) |
| 2175 | } |
| 2176 | if pending < expected { |
| 2177 | t.Errorf("Expected %v, got %v", expected, pending) |
| 2178 | } |
| 2179 | |
| 2180 | info, err := sub.ConsumerInfo() |
| 2181 | if err != nil { |
| 2182 | t.Fatal(err) |
| 2183 | } |
| 2184 | |
| 2185 | got := info.NumRedelivered |
| 2186 | expected = 3 |
| 2187 | if got < expected { |
| 2188 | t.Errorf("Expected %v, got: %v", expected, got) |
nothing calls this directly
no test coverage detected