(t *testing.T)
| 315 | } |
| 316 | |
| 317 | func TestQueueSubsOnReconnect(t *testing.T) { |
| 318 | ts := startReconnectServer(t) |
| 319 | defer ts.Shutdown() |
| 320 | |
| 321 | opts := reconnectOpts |
| 322 | |
| 323 | // Allow us to block on reconnect complete. |
| 324 | reconnectsDone := make(chan bool) |
| 325 | opts.ReconnectedCB = func(nc *nats.Conn) { |
| 326 | reconnectsDone <- true |
| 327 | } |
| 328 | |
| 329 | // Create connection |
| 330 | nc, err := opts.Connect() |
| 331 | if err != nil { |
| 332 | t.Fatalf("Should have connected ok: %v\n", err) |
| 333 | } |
| 334 | defer nc.Close() |
| 335 | |
| 336 | // To hold results. |
| 337 | results := make(map[int]int) |
| 338 | var mu sync.Mutex |
| 339 | |
| 340 | // Make sure we got what we needed, 1 msg only and all seqnos accounted for.. |
| 341 | checkResults := func(numSent int) { |
| 342 | mu.Lock() |
| 343 | defer mu.Unlock() |
| 344 | |
| 345 | for i := 0; i < numSent; i++ { |
| 346 | if results[i] != 1 { |
| 347 | t.Fatalf("Received incorrect number of messages, [%d] for seq: %d\n", results[i], i) |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | // Auto reset results map |
| 352 | results = make(map[int]int) |
| 353 | } |
| 354 | |
| 355 | subj := "foo.bar" |
| 356 | qgroup := "workers" |
| 357 | |
| 358 | cb := func(m *nats.Msg) { |
| 359 | mu.Lock() |
| 360 | defer mu.Unlock() |
| 361 | seqno, err := strconv.Atoi(string(m.Data)) |
| 362 | if err != nil { |
| 363 | t.Fatalf("Received an invalid sequence number: %v\n", err) |
| 364 | } |
| 365 | results[seqno] = results[seqno] + 1 |
| 366 | } |
| 367 | |
| 368 | // Create Queue Subscribers |
| 369 | nc.QueueSubscribe(subj, qgroup, cb) |
| 370 | nc.QueueSubscribe(subj, qgroup, cb) |
| 371 | |
| 372 | nc.Flush() |
| 373 | |
| 374 | // Helper function to send messages and check results. |
nothing calls this directly
no test coverage detected