drainConnection will run in a separate Go routine and will flush all publishes and drain all active subscriptions.
()
| 6075 | // drainConnection will run in a separate Go routine and will |
| 6076 | // flush all publishes and drain all active subscriptions. |
| 6077 | func (nc *Conn) drainConnection() { |
| 6078 | // Snapshot subs list. |
| 6079 | nc.mu.Lock() |
| 6080 | |
| 6081 | // Check again here if we are in a state to not process. |
| 6082 | if nc.isClosed() { |
| 6083 | nc.mu.Unlock() |
| 6084 | return |
| 6085 | } |
| 6086 | if nc.isConnecting() || nc.isReconnecting() { |
| 6087 | nc.mu.Unlock() |
| 6088 | // Move to closed state. |
| 6089 | nc.Close() |
| 6090 | return |
| 6091 | } |
| 6092 | |
| 6093 | subs := make([]*Subscription, 0, len(nc.subs)) |
| 6094 | for _, s := range nc.subs { |
| 6095 | if s == nc.respMux { |
| 6096 | // Skip since might be in use while messages |
| 6097 | // are being processed (can miss responses). |
| 6098 | continue |
| 6099 | } |
| 6100 | subs = append(subs, s) |
| 6101 | } |
| 6102 | errCB := nc.Opts.AsyncErrorCB |
| 6103 | drainWait := nc.Opts.DrainTimeout |
| 6104 | respMux := nc.respMux |
| 6105 | nc.mu.Unlock() |
| 6106 | |
| 6107 | // for pushing errors with context. |
| 6108 | pushErr := func(err error) { |
| 6109 | nc.mu.Lock() |
| 6110 | nc.err = err |
| 6111 | if errCB != nil { |
| 6112 | nc.ach.push(func() { errCB(nc, nil, err) }) |
| 6113 | } |
| 6114 | nc.mu.Unlock() |
| 6115 | } |
| 6116 | |
| 6117 | // Do subs first, skip request handler if present. |
| 6118 | for _, s := range subs { |
| 6119 | if err := s.Drain(); err != nil { |
| 6120 | // We will notify about these but continue. |
| 6121 | pushErr(err) |
| 6122 | } |
| 6123 | } |
| 6124 | |
| 6125 | // Wait for the subscriptions to drop to zero. |
| 6126 | timeout := time.Now().Add(drainWait) |
| 6127 | var min int |
| 6128 | if respMux != nil { |
| 6129 | min = 1 |
| 6130 | } else { |
| 6131 | min = 0 |
| 6132 | } |
| 6133 | for time.Now().Before(timeout) { |
| 6134 | if nc.NumSubscriptions() == min { |
no test coverage detected