(t *testing.T)
| 506 | } |
| 507 | |
| 508 | func TestSlowAsyncSubscriber(t *testing.T) { |
| 509 | s := RunDefaultServer() |
| 510 | defer s.Shutdown() |
| 511 | |
| 512 | nc := NewDefaultConnection(t) |
| 513 | defer nc.Close() |
| 514 | |
| 515 | // Override default handler for test. |
| 516 | nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {}) |
| 517 | |
| 518 | bch := make(chan bool) |
| 519 | |
| 520 | sub, _ := nc.Subscribe("foo", func(m *nats.Msg) { |
| 521 | // block to back us up.. |
| 522 | <-bch |
| 523 | // Avoid repeated calls that would then block again |
| 524 | m.Sub.Unsubscribe() |
| 525 | }) |
| 526 | // Make sure these are the defaults |
| 527 | pm, pb, _ := sub.PendingLimits() |
| 528 | if pm != nats.DefaultSubPendingMsgsLimit { |
| 529 | t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", nats.DefaultSubPendingMsgsLimit, pm) |
| 530 | } |
| 531 | if pb != nats.DefaultSubPendingBytesLimit { |
| 532 | t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", nats.DefaultSubPendingBytesLimit, pb) |
| 533 | } |
| 534 | |
| 535 | // Set new limits |
| 536 | pml := 100 |
| 537 | pbl := 1024 * 1024 |
| 538 | |
| 539 | sub.SetPendingLimits(pml, pbl) |
| 540 | |
| 541 | // Make sure the set is correct |
| 542 | pm, pb, _ = sub.PendingLimits() |
| 543 | if pm != pml { |
| 544 | t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", pml, pm) |
| 545 | } |
| 546 | if pb != pbl { |
| 547 | t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", pbl, pb) |
| 548 | } |
| 549 | |
| 550 | for range int(pml) + 100 { |
| 551 | nc.Publish("foo", []byte("Hello")) |
| 552 | } |
| 553 | |
| 554 | timeout := 5 * time.Second |
| 555 | start := time.Now() |
| 556 | err := nc.FlushTimeout(timeout) |
| 557 | elapsed := time.Since(start) |
| 558 | if elapsed >= timeout { |
| 559 | t.Fatalf("Flush did not return before timeout") |
| 560 | } |
| 561 | // We want flush to work, so expect no error for it. |
| 562 | if err != nil { |
| 563 | t.Fatalf("Expected no error from Flush()\n") |
| 564 | } |
| 565 | if nc.LastError() != nats.ErrSlowConsumer { |
nothing calls this directly
no test coverage detected