checkDrained will watch for a subscription to be fully drained and then remove it.
(sub *Subscription)
| 5206 | // checkDrained will watch for a subscription to be fully drained |
| 5207 | // and then remove it. |
| 5208 | func (nc *Conn) checkDrained(sub *Subscription) { |
| 5209 | if nc == nil || sub == nil { |
| 5210 | return |
| 5211 | } |
| 5212 | defer func() { |
| 5213 | sub.mu.Lock() |
| 5214 | defer sub.mu.Unlock() |
| 5215 | sub.draining = false |
| 5216 | }() |
| 5217 | |
| 5218 | // This allows us to know that whatever we have in the client pending |
| 5219 | // is correct and the server will not send additional information. |
| 5220 | nc.Flush() |
| 5221 | |
| 5222 | sub.mu.Lock() |
| 5223 | // For JS subscriptions, check if we are going to delete the |
| 5224 | // JS consumer when drain completes. |
| 5225 | dc := sub.jsi != nil && sub.jsi.dc |
| 5226 | sub.mu.Unlock() |
| 5227 | |
| 5228 | // Once we are here we just wait for Pending to reach 0 or |
| 5229 | // any other state to exit this go routine. |
| 5230 | for { |
| 5231 | // check connection is still valid. |
| 5232 | if nc.IsClosed() { |
| 5233 | return |
| 5234 | } |
| 5235 | |
| 5236 | // Check subscription state |
| 5237 | sub.mu.Lock() |
| 5238 | conn := sub.conn |
| 5239 | closed := sub.closed |
| 5240 | pMsgs := sub.pMsgs |
| 5241 | sub.mu.Unlock() |
| 5242 | |
| 5243 | if conn == nil || closed || pMsgs == 0 { |
| 5244 | nc.mu.Lock() |
| 5245 | nc.removeSub(sub) |
| 5246 | nc.mu.Unlock() |
| 5247 | if dc { |
| 5248 | if err := sub.deleteConsumer(); err != nil { |
| 5249 | nc.mu.Lock() |
| 5250 | if errCB := nc.Opts.AsyncErrorCB; errCB != nil { |
| 5251 | nc.ach.push(func() { errCB(nc, sub, err) }) |
| 5252 | } |
| 5253 | nc.mu.Unlock() |
| 5254 | } |
| 5255 | } |
| 5256 | return |
| 5257 | } |
| 5258 | |
| 5259 | time.Sleep(100 * time.Millisecond) |
| 5260 | } |
| 5261 | } |
| 5262 | |
| 5263 | // AutoUnsubscribe will issue an automatic Unsubscribe that is |
| 5264 | // processed by the server when max messages have been received. |
no test coverage detected