MCPcopy
hub / github.com/nats-io/nats.go / chanSubcheckForFlowControlResponse

Method chanSubcheckForFlowControlResponse

js.go:2073–2097  ·  view source on GitHub ↗

This long-lived routine is used per ChanSubscription to check on the number of delivered messages and check for flow control response.

()

Source from the content-addressed store, hash-verified

2071// This long-lived routine is used per ChanSubscription to check
2072// on the number of delivered messages and check for flow control response.
2073func (sub *Subscription) chanSubcheckForFlowControlResponse() {
2074 sub.mu.Lock()
2075 // We don't use defer since if we need to send an RC reply, we need
2076 // to do it outside the sub's lock. So doing explicit unlock...
2077 if sub.closed {
2078 sub.mu.Unlock()
2079 return
2080 }
2081 var fcReply string
2082 var nc *Conn
2083
2084 jsi := sub.jsi
2085 if jsi.csfct == nil {
2086 jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
2087 } else {
2088 fcReply = sub.checkForFlowControlResponse()
2089 nc = sub.conn
2090 // Do the reset here under the lock, it's ok...
2091 jsi.csfct.Reset(chanSubFCCheckInterval)
2092 }
2093 sub.mu.Unlock()
2094 // This call will return an error (which we don't care here)
2095 // if nc is nil or fcReply is empty.
2096 nc.Publish(fcReply, nil)
2097}
2098
2099// ErrConsumerSequenceMismatch represents an error from a consumer
2100// that received a Heartbeat including sequence different to the

Callers 1

subscribeMethod · 0.80

Calls 3

PublishMethod · 0.95
ResetMethod · 0.65

Tested by

no test coverage detected