respHandler is the global response handler. It will look up the appropriate channel based on the last token and place the message on the channel if possible.
(m *Msg)
| 4540 | // the appropriate channel based on the last token and place |
| 4541 | // the message on the channel if possible. |
| 4542 | func (nc *Conn) respHandler(m *Msg) { |
| 4543 | nc.mu.Lock() |
| 4544 | |
| 4545 | // Just return if closed. |
| 4546 | if nc.isClosed() { |
| 4547 | nc.mu.Unlock() |
| 4548 | return |
| 4549 | } |
| 4550 | |
| 4551 | var mch chan *Msg |
| 4552 | |
| 4553 | // Grab mch |
| 4554 | rt := nc.respToken(m.Subject) |
| 4555 | if rt != _EMPTY_ { |
| 4556 | mch = nc.respMap[rt] |
| 4557 | // Delete the key regardless, one response only. |
| 4558 | delete(nc.respMap, rt) |
| 4559 | } else if len(nc.respMap) == 1 { |
| 4560 | // If the server has rewritten the subject, the response token (rt) |
| 4561 | // will not match (could be the case with JetStream). If that is the |
| 4562 | // case and there is a single entry, use that. |
| 4563 | for k, v := range nc.respMap { |
| 4564 | mch = v |
| 4565 | delete(nc.respMap, k) |
| 4566 | break |
| 4567 | } |
| 4568 | } |
| 4569 | nc.mu.Unlock() |
| 4570 | |
| 4571 | // Don't block, let Request timeout instead, mch is |
| 4572 | // buffered and we should delete the key before a |
| 4573 | // second response is processed. |
| 4574 | select { |
| 4575 | case mch <- m: |
| 4576 | default: |
| 4577 | return |
| 4578 | } |
| 4579 | } |
| 4580 | |
| 4581 | // Helper to setup and send new request style requests. Return the chan to receive the response. |
| 4582 | func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) { |