unsubscribe performs the low level unsubscribe to the server. Use Subscription.Unsubscribe()
(sub *Subscription, max int, drainMode bool)
| 5289 | // unsubscribe performs the low level unsubscribe to the server. |
| 5290 | // Use Subscription.Unsubscribe() |
| 5291 | func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { |
| 5292 | var maxStr string |
| 5293 | if max > 0 { |
| 5294 | sub.mu.Lock() |
| 5295 | sub.max = uint64(max) |
| 5296 | if sub.delivered < sub.max { |
| 5297 | maxStr = strconv.Itoa(max) |
| 5298 | } |
| 5299 | sub.mu.Unlock() |
| 5300 | } |
| 5301 | |
| 5302 | nc.mu.Lock() |
| 5303 | // ok here, but defer is expensive |
| 5304 | defer nc.mu.Unlock() |
| 5305 | |
| 5306 | if nc.isClosed() { |
| 5307 | return ErrConnectionClosed |
| 5308 | } |
| 5309 | |
| 5310 | nc.subsMu.RLock() |
| 5311 | s := nc.subs[sub.sid] |
| 5312 | nc.subsMu.RUnlock() |
| 5313 | // Already unsubscribed |
| 5314 | if s == nil { |
| 5315 | return nil |
| 5316 | } |
| 5317 | |
| 5318 | if maxStr == _EMPTY_ && !drainMode { |
| 5319 | nc.removeSub(s) |
| 5320 | } |
| 5321 | |
| 5322 | if drainMode { |
| 5323 | s.mu.Lock() |
| 5324 | s.draining = true |
| 5325 | sub.changeSubStatus(SubscriptionDraining) |
| 5326 | s.mu.Unlock() |
| 5327 | go nc.checkDrained(sub) |
| 5328 | } |
| 5329 | |
| 5330 | // We will send these for all subs when we reconnect |
| 5331 | // so that we can suppress here. |
| 5332 | if !nc.isReconnecting() { |
| 5333 | nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr)) |
| 5334 | nc.kickFlusher() |
| 5335 | } |
| 5336 | |
| 5337 | // For JetStream subscriptions cancel the attached context if there is any. |
| 5338 | var cancel func() |
| 5339 | sub.mu.Lock() |
| 5340 | jsi := sub.jsi |
| 5341 | if jsi != nil { |
| 5342 | cancel = jsi.cancel |
| 5343 | jsi.cancel = nil |
| 5344 | } |
| 5345 | sub.mu.Unlock() |
| 5346 | if cancel != nil { |
| 5347 | cancel() |
| 5348 | } |
no test coverage detected