MCPcopy
hub / github.com/nats-io/nats.go / TestQueueSubsOnReconnect

Function TestQueueSubsOnReconnect

test/reconnect_test.go:317–401  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

315}
316
317func TestQueueSubsOnReconnect(t *testing.T) {
318 ts := startReconnectServer(t)
319 defer ts.Shutdown()
320
321 opts := reconnectOpts
322
323 // Allow us to block on reconnect complete.
324 reconnectsDone := make(chan bool)
325 opts.ReconnectedCB = func(nc *nats.Conn) {
326 reconnectsDone <- true
327 }
328
329 // Create connection
330 nc, err := opts.Connect()
331 if err != nil {
332 t.Fatalf("Should have connected ok: %v\n", err)
333 }
334 defer nc.Close()
335
336 // To hold results.
337 results := make(map[int]int)
338 var mu sync.Mutex
339
340 // Make sure we got what we needed, 1 msg only and all seqnos accounted for..
341 checkResults := func(numSent int) {
342 mu.Lock()
343 defer mu.Unlock()
344
345 for i := 0; i < numSent; i++ {
346 if results[i] != 1 {
347 t.Fatalf("Received incorrect number of messages, [%d] for seq: %d\n", results[i], i)
348 }
349 }
350
351 // Auto reset results map
352 results = make(map[int]int)
353 }
354
355 subj := "foo.bar"
356 qgroup := "workers"
357
358 cb := func(m *nats.Msg) {
359 mu.Lock()
360 defer mu.Unlock()
361 seqno, err := strconv.Atoi(string(m.Data))
362 if err != nil {
363 t.Fatalf("Received an invalid sequence number: %v\n", err)
364 }
365 results[seqno] = results[seqno] + 1
366 }
367
368 // Create Queue Subscribers
369 nc.QueueSubscribe(subj, qgroup, cb)
370 nc.QueueSubscribe(subj, qgroup, cb)
371
372 nc.Flush()
373
374 // Helper function to send messages and check results.

Callers

nothing calls this directly

Calls 8

startReconnectServerFunction · 0.85
ConnectMethod · 0.80
FatalfMethod · 0.80
WaitFunction · 0.70
QueueSubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected