(t *testing.T)
| 450 | } |
| 451 | |
| 452 | func TestSlowSubscriber(t *testing.T) { |
| 453 | s := RunDefaultServer() |
| 454 | defer s.Shutdown() |
| 455 | |
| 456 | nc := NewDefaultConnection(t) |
| 457 | defer nc.Close() |
| 458 | |
| 459 | // Override default handler for test. |
| 460 | nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {}) |
| 461 | |
| 462 | sub, _ := nc.SubscribeSync("foo") |
| 463 | sub.SetPendingLimits(100, 1024) |
| 464 | |
| 465 | for range 200 { |
| 466 | nc.Publish("foo", []byte("Hello")) |
| 467 | } |
| 468 | timeout := 5 * time.Second |
| 469 | start := time.Now() |
| 470 | nc.FlushTimeout(timeout) |
| 471 | elapsed := time.Since(start) |
| 472 | if elapsed >= timeout { |
| 473 | t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout) |
| 474 | } |
| 475 | // Make sure NextMsg returns an error to indicate slow consumer |
| 476 | _, err := sub.NextMsg(200 * time.Millisecond) |
| 477 | if err == nil { |
| 478 | t.Fatalf("NextMsg did not return an error") |
| 479 | } |
| 480 | } |
| 481 | |
| 482 | func TestSlowChanSubscriber(t *testing.T) { |
| 483 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected