(t *testing.T)
| 237 | } |
| 238 | |
| 239 | func TestDrainConnection(t *testing.T) { |
| 240 | s := RunDefaultServer() |
| 241 | defer s.Shutdown() |
| 242 | |
| 243 | done := make(chan bool) |
| 244 | rdone := make(chan bool) |
| 245 | |
| 246 | closed := func(nc *nats.Conn) { |
| 247 | done <- true |
| 248 | } |
| 249 | |
| 250 | url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort) |
| 251 | nc, err := nats.Connect(url, nats.ClosedHandler(closed)) |
| 252 | if err != nil { |
| 253 | t.Fatalf("Failed to create default connection: %v", err) |
| 254 | } |
| 255 | defer nc.Close() |
| 256 | |
| 257 | nc2, err := nats.Connect(url) |
| 258 | if err != nil { |
| 259 | t.Fatalf("Failed to create default connection: %v", err) |
| 260 | } |
| 261 | defer nc2.Close() |
| 262 | |
| 263 | received := int32(0) |
| 264 | responses := int32(0) |
| 265 | expected := int32(50) |
| 266 | sleep := 10 * time.Millisecond |
| 267 | |
| 268 | // Create the listener for responses on "bar" |
| 269 | _, err = nc2.Subscribe("bar", func(_ *nats.Msg) { |
| 270 | r := atomic.AddInt32(&responses, 1) |
| 271 | if r == expected { |
| 272 | rdone <- true |
| 273 | } |
| 274 | }) |
| 275 | if err != nil { |
| 276 | t.Fatalf("Error creating subscription for responses: %v", err) |
| 277 | } |
| 278 | |
| 279 | // Create a slow subscriber for the responder |
| 280 | sub, err := nc.Subscribe("foo", func(m *nats.Msg) { |
| 281 | time.Sleep(sleep) |
| 282 | atomic.AddInt32(&received, 1) |
| 283 | err := nc.Publish(m.Reply, []byte("Stop bugging me")) |
| 284 | if err != nil { |
| 285 | t.Errorf("Publisher received an error sending response: %v\n", err) |
| 286 | } |
| 287 | }) |
| 288 | if err != nil { |
| 289 | t.Fatalf("Error creating subscription; %v", err) |
| 290 | } |
| 291 | |
| 292 | // Publish some messages |
| 293 | for i := int32(0); i < expected; i++ { |
| 294 | nc.PublishRequest("foo", "bar", []byte("Slow Slow")) |
| 295 | } |
| 296 |
nothing calls this directly
no test coverage detected