More advanced tests on subscriptions
(t *testing.T)
| 29 | // More advanced tests on subscriptions |
| 30 | |
| 31 | func TestServerAutoUnsub(t *testing.T) { |
| 32 | s := RunDefaultServer() |
| 33 | defer s.Shutdown() |
| 34 | |
| 35 | nc := NewDefaultConnection(t) |
| 36 | defer nc.Close() |
| 37 | received := int32(0) |
| 38 | max := int32(10) |
| 39 | |
| 40 | // Call this to make sure that we have everything setup connection wise |
| 41 | nc.Flush() |
| 42 | |
| 43 | // When this test is run by itself it's fine, but when run with others |
| 44 | // we need to make sure the go routines reading has settled. |
| 45 | time.Sleep(250 * time.Millisecond) |
| 46 | |
| 47 | base := getStableNumGoroutine(t) |
| 48 | |
| 49 | sub, err := nc.Subscribe("foo", func(_ *nats.Msg) { |
| 50 | atomic.AddInt32(&received, 1) |
| 51 | }) |
| 52 | if err != nil { |
| 53 | t.Fatal("Failed to subscribe: ", err) |
| 54 | } |
| 55 | sub.AutoUnsubscribe(int(max)) |
| 56 | total := 100 |
| 57 | for range total { |
| 58 | nc.Publish("foo", []byte("Hello")) |
| 59 | } |
| 60 | nc.Flush() |
| 61 | time.Sleep(100 * time.Millisecond) |
| 62 | |
| 63 | if atomic.LoadInt32(&received) != max { |
| 64 | t.Fatalf("Received %d msgs, wanted only %d\n", received, max) |
| 65 | } |
| 66 | if sub.IsValid() { |
| 67 | t.Fatal("Expected subscription to be invalid after hitting max") |
| 68 | } |
| 69 | if err := sub.AutoUnsubscribe(10); err == nil { |
| 70 | t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail") |
| 71 | } |
| 72 | checkNoGoroutineLeak(t, base, "AutoUnsubscribe() limit reached") |
| 73 | } |
| 74 | |
| 75 | func TestClientSyncAutoUnsub(t *testing.T) { |
| 76 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected