This long-lived routine is used per ChanSubscription to check on the number of delivered messages and check for flow control response.
()
| 2071 | // This long-lived routine is used per ChanSubscription to check |
| 2072 | // on the number of delivered messages and check for flow control response. |
| 2073 | func (sub *Subscription) chanSubcheckForFlowControlResponse() { |
| 2074 | sub.mu.Lock() |
| 2075 | // We don't use defer since if we need to send an RC reply, we need |
| 2076 | // to do it outside the sub's lock. So doing explicit unlock... |
| 2077 | if sub.closed { |
| 2078 | sub.mu.Unlock() |
| 2079 | return |
| 2080 | } |
| 2081 | var fcReply string |
| 2082 | var nc *Conn |
| 2083 | |
| 2084 | jsi := sub.jsi |
| 2085 | if jsi.csfct == nil { |
| 2086 | jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse) |
| 2087 | } else { |
| 2088 | fcReply = sub.checkForFlowControlResponse() |
| 2089 | nc = sub.conn |
| 2090 | // Do the reset here under the lock, it's ok... |
| 2091 | jsi.csfct.Reset(chanSubFCCheckInterval) |
| 2092 | } |
| 2093 | sub.mu.Unlock() |
| 2094 | // This call will return an error (which we don't care here) |
| 2095 | // if nc is nil or fcReply is empty. |
| 2096 | nc.Publish(fcReply, nil) |
| 2097 | } |
| 2098 | |
| 2099 | // ErrConsumerSequenceMismatch represents an error from a consumer |
| 2100 | // that received a Heartbeat including sequence different to the |
no test coverage detected