MCPcopy
hub / github.com/nats-io/nats.go / checkDrained

Method checkDrained

nats.go:5208–5261  ·  view source on GitHub ↗

checkDrained will watch for a subscription to be fully drained and then remove it.

(sub *Subscription)

Source from the content-addressed store, hash-verified

5206// checkDrained will watch for a subscription to be fully drained
5207// and then remove it.
5208func (nc *Conn) checkDrained(sub *Subscription) {
5209 if nc == nil || sub == nil {
5210 return
5211 }
5212 defer func() {
5213 sub.mu.Lock()
5214 defer sub.mu.Unlock()
5215 sub.draining = false
5216 }()
5217
5218 // This allows us to know that whatever we have in the client pending
5219 // is correct and the server will not send additional information.
5220 nc.Flush()
5221
5222 sub.mu.Lock()
5223 // For JS subscriptions, check if we are going to delete the
5224 // JS consumer when drain completes.
5225 dc := sub.jsi != nil && sub.jsi.dc
5226 sub.mu.Unlock()
5227
5228 // Once we are here we just wait for Pending to reach 0 or
5229 // any other state to exit this go routine.
5230 for {
5231 // check connection is still valid.
5232 if nc.IsClosed() {
5233 return
5234 }
5235
5236 // Check subscription state
5237 sub.mu.Lock()
5238 conn := sub.conn
5239 closed := sub.closed
5240 pMsgs := sub.pMsgs
5241 sub.mu.Unlock()
5242
5243 if conn == nil || closed || pMsgs == 0 {
5244 nc.mu.Lock()
5245 nc.removeSub(sub)
5246 nc.mu.Unlock()
5247 if dc {
5248 if err := sub.deleteConsumer(); err != nil {
5249 nc.mu.Lock()
5250 if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
5251 nc.ach.push(func() { errCB(nc, sub, err) })
5252 }
5253 nc.mu.Unlock()
5254 }
5255 }
5256 return
5257 }
5258
5259 time.Sleep(100 * time.Millisecond)
5260 }
5261}
5262
5263// AutoUnsubscribe will issue an automatic Unsubscribe that is
5264// processed by the server when max messages have been received.

Callers 1

unsubscribeMethod · 0.95

Calls 5

FlushMethod · 0.95
IsClosedMethod · 0.95
removeSubMethod · 0.95
deleteConsumerMethod · 0.80
pushMethod · 0.45

Tested by

no test coverage detected