MCPcopy
hub / github.com/nats-io/nats.go / TestDrainConnection

Function TestDrainConnection

test/drain_test.go:239–340  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

237}
238
239func TestDrainConnection(t *testing.T) {
240 s := RunDefaultServer()
241 defer s.Shutdown()
242
243 done := make(chan bool)
244 rdone := make(chan bool)
245
246 closed := func(nc *nats.Conn) {
247 done <- true
248 }
249
250 url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort)
251 nc, err := nats.Connect(url, nats.ClosedHandler(closed))
252 if err != nil {
253 t.Fatalf("Failed to create default connection: %v", err)
254 }
255 defer nc.Close()
256
257 nc2, err := nats.Connect(url)
258 if err != nil {
259 t.Fatalf("Failed to create default connection: %v", err)
260 }
261 defer nc2.Close()
262
263 received := int32(0)
264 responses := int32(0)
265 expected := int32(50)
266 sleep := 10 * time.Millisecond
267
268 // Create the listener for responses on "bar"
269 _, err = nc2.Subscribe("bar", func(_ *nats.Msg) {
270 r := atomic.AddInt32(&responses, 1)
271 if r == expected {
272 rdone <- true
273 }
274 })
275 if err != nil {
276 t.Fatalf("Error creating subscription for responses: %v", err)
277 }
278
279 // Create a slow subscriber for the responder
280 sub, err := nc.Subscribe("foo", func(m *nats.Msg) {
281 time.Sleep(sleep)
282 atomic.AddInt32(&received, 1)
283 err := nc.Publish(m.Reply, []byte("Stop bugging me"))
284 if err != nil {
285 t.Errorf("Publisher received an error sending response: %v\n", err)
286 }
287 })
288 if err != nil {
289 t.Fatalf("Error creating subscription; %v", err)
290 }
291
292 // Publish some messages
293 for i := int32(0); i < expected; i++ {
294 nc.PublishRequest("foo", "bar", []byte("Slow Slow"))
295 }
296

Callers

nothing calls this directly

Calls 12

ConnectMethod · 0.80
ClosedHandlerMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
UnsubscribeMethod · 0.80
DurationMethod · 0.80
RunDefaultServerFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
DrainMethod · 0.65
CloseMethod · 0.45
PublishRequestMethod · 0.45

Tested by

no test coverage detected