(t *testing.T)
| 194 | } |
| 195 | |
| 196 | func TestDrainSlowSubscriber(t *testing.T) { |
| 197 | s := RunDefaultServer() |
| 198 | defer s.Shutdown() |
| 199 | nc := NewDefaultConnection(t) |
| 200 | defer nc.Close() |
| 201 | |
| 202 | received := int32(0) |
| 203 | |
| 204 | sub, err := nc.Subscribe("foo", func(_ *nats.Msg) { |
| 205 | atomic.AddInt32(&received, 1) |
| 206 | time.Sleep(100 * time.Millisecond) |
| 207 | }) |
| 208 | if err != nil { |
| 209 | t.Fatalf("Error creating subscription; %v", err) |
| 210 | } |
| 211 | |
| 212 | total := 10 |
| 213 | for i := 0; i < total; i++ { |
| 214 | nc.Publish("foo", []byte("Slow Slow")) |
| 215 | } |
| 216 | nc.Flush() |
| 217 | |
| 218 | pmsgs, _, _ := sub.Pending() |
| 219 | if pmsgs != total && pmsgs != total-1 { |
| 220 | t.Fatalf("Expected most messages to be pending, but got %d vs %d", pmsgs, total) |
| 221 | } |
| 222 | sub.Drain() |
| 223 | |
| 224 | // Should take a second or so to drain away. |
| 225 | waitFor(t, 2*time.Second, 100*time.Millisecond, func() error { |
| 226 | // Wait for it to become invalid. Once drained it is unsubscribed. |
| 227 | _, _, err := sub.Pending() |
| 228 | if err != nats.ErrBadSubscription { |
| 229 | return errors.New("Still valid") |
| 230 | } |
| 231 | r := int(atomic.LoadInt32(&received)) |
| 232 | if r != total { |
| 233 | t.Fatalf("Did not receive all messages, got %d vs %d", r, total) |
| 234 | } |
| 235 | return nil |
| 236 | }) |
| 237 | } |
| 238 | |
| 239 | func TestDrainConnection(t *testing.T) { |
| 240 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected