MCPcopy
hub / github.com/nats-io/nats.go / processNextMsgDelivered

Method processNextMsgDelivered

nats.go:5539–5578  ·  view source on GitHub ↗

processNextMsgDelivered takes a message and applies the needed accounting to the stats from the subscription, returning an error in case we have the maximum number of messages have been delivered already. It should not be called while holding the lock.

(msg *Msg)

Source from the content-addressed store, hash-verified

5537// error in case we have the maximum number of messages have been
5538// delivered already. It should not be called while holding the lock.
5539func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
5540 s.mu.Lock()
5541 nc := s.conn
5542 max := s.max
5543
5544 var fcReply string
5545 // Update some stats.
5546 s.delivered++
5547 delivered := s.delivered
5548 if s.jsi != nil {
5549 fcReply = s.checkForFlowControlResponse()
5550 }
5551
5552 if s.typ == SyncSubscription {
5553 s.pMsgs--
5554 s.pBytes -= len(msg.Data)
5555 }
5556 s.mu.Unlock()
5557
5558 if fcReply != _EMPTY_ {
5559 nc.Publish(fcReply, nil)
5560 }
5561
5562 if max > 0 {
5563 if delivered > max {
5564 return ErrMaxMessages
5565 }
5566 // Remove subscription if we have reached max.
5567 if delivered == max {
5568 nc.mu.Lock()
5569 nc.removeSub(s)
5570 nc.mu.Unlock()
5571 }
5572 }
5573 if len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders {
5574 return ErrNoResponders
5575 }
5576
5577 return nil
5578}
5579
5580// Queued returns the number of queued messages in the client for this subscription.
5581//

Callers 3

NextMsgMethod · 0.95
nextMsgNoTimeoutMethod · 0.95
nextMsgWithContextMethod · 0.95

Calls 4

removeSubMethod · 0.80
PublishMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected