processNextMsgDelivered takes a message and applies the needed accounting to the stats from the subscription, returning an error in case we have the maximum number of messages have been delivered already. It should not be called while holding the lock.
(msg *Msg)
| 5537 | // error in case we have the maximum number of messages have been |
| 5538 | // delivered already. It should not be called while holding the lock. |
| 5539 | func (s *Subscription) processNextMsgDelivered(msg *Msg) error { |
| 5540 | s.mu.Lock() |
| 5541 | nc := s.conn |
| 5542 | max := s.max |
| 5543 | |
| 5544 | var fcReply string |
| 5545 | // Update some stats. |
| 5546 | s.delivered++ |
| 5547 | delivered := s.delivered |
| 5548 | if s.jsi != nil { |
| 5549 | fcReply = s.checkForFlowControlResponse() |
| 5550 | } |
| 5551 | |
| 5552 | if s.typ == SyncSubscription { |
| 5553 | s.pMsgs-- |
| 5554 | s.pBytes -= len(msg.Data) |
| 5555 | } |
| 5556 | s.mu.Unlock() |
| 5557 | |
| 5558 | if fcReply != _EMPTY_ { |
| 5559 | nc.Publish(fcReply, nil) |
| 5560 | } |
| 5561 | |
| 5562 | if max > 0 { |
| 5563 | if delivered > max { |
| 5564 | return ErrMaxMessages |
| 5565 | } |
| 5566 | // Remove subscription if we have reached max. |
| 5567 | if delivered == max { |
| 5568 | nc.mu.Lock() |
| 5569 | nc.removeSub(s) |
| 5570 | nc.mu.Unlock() |
| 5571 | } |
| 5572 | } |
| 5573 | if len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders { |
| 5574 | return ErrNoResponders |
| 5575 | } |
| 5576 | |
| 5577 | return nil |
| 5578 | } |
| 5579 | |
| 5580 | // Queued returns the number of queued messages in the client for this subscription. |
| 5581 | // |
no test coverage detected