(t *testing.T)
| 1214 | } |
| 1215 | |
| 1216 | func TestAsyncSubscriptionPendingDrain(t *testing.T) { |
| 1217 | s := RunDefaultServer() |
| 1218 | defer s.Shutdown() |
| 1219 | |
| 1220 | nc := NewDefaultConnection(t) |
| 1221 | defer nc.Close() |
| 1222 | |
| 1223 | // Send some messages to ourselves. |
| 1224 | total := 100 |
| 1225 | msg := []byte("0123456789") |
| 1226 | |
| 1227 | sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {}) |
| 1228 | defer sub.Unsubscribe() |
| 1229 | |
| 1230 | for range total { |
| 1231 | nc.Publish("foo", msg) |
| 1232 | } |
| 1233 | nc.Flush() |
| 1234 | |
| 1235 | // Wait for all delivered. |
| 1236 | waitFor(t, 2*time.Second, 15*time.Millisecond, func() error { |
| 1237 | if d, _ := sub.Delivered(); d != int64(total) { |
| 1238 | return fmt.Errorf("Wrong delivered count: %v vs %v", d, total) |
| 1239 | } |
| 1240 | m, b, _ := sub.Pending() |
| 1241 | if m != 0 { |
| 1242 | return fmt.Errorf("Expected msgs of 0, got %d", m) |
| 1243 | } |
| 1244 | if b != 0 { |
| 1245 | return fmt.Errorf("Expected bytes of 0, got %d", b) |
| 1246 | } |
| 1247 | return nil |
| 1248 | }) |
| 1249 | |
| 1250 | sub.Unsubscribe() |
| 1251 | if _, err := sub.Delivered(); err == nil { |
| 1252 | t.Fatal("Calling Delivered() on closed subscription should fail") |
| 1253 | } |
| 1254 | } |
| 1255 | |
| 1256 | func TestSyncSubscriptionPendingDrain(t *testing.T) { |
| 1257 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected