MCPcopy
hub / github.com/nats-io/nats.go / waitForMsgs

Method waitForMsgs

nats.go:3571–3660  ·  view source on GitHub ↗

waitForMsgs waits on the conditional shared with readLoop and processMsg. It is used to deliver messages to asynchronous subscribers.

(s *Subscription)

Source from the content-addressed store, hash-verified

3569// waitForMsgs waits on the conditional shared with readLoop and processMsg.
3570// It is used to deliver messages to asynchronous subscribers.
3571func (nc *Conn) waitForMsgs(s *Subscription) {
3572 var closed bool
3573 var delivered, max uint64
3574
3575 // Used to account for adjustments to sub.pBytes when we wrap back around.
3576 msgLen := -1
3577
3578 for {
3579 s.mu.Lock()
3580 // Do accounting for last msg delivered here so we only lock once
3581 // and drain state trips after callback has returned.
3582 if msgLen >= 0 {
3583 s.pMsgs--
3584 s.pBytes -= msgLen
3585 msgLen = -1
3586 }
3587
3588 if s.pHead == nil && !s.closed {
3589 s.pCond.Wait()
3590 }
3591 // Pop the msg off the list
3592 m := s.pHead
3593 if m != nil {
3594 s.pHead = m.next
3595 if s.pHead == nil {
3596 s.pTail = nil
3597 }
3598 if m.barrier != nil {
3599 s.mu.Unlock()
3600 if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
3601 m.barrier.f()
3602 }
3603 continue
3604 }
3605 msgLen = len(m.Data)
3606 }
3607 mcb := s.mcb
3608 max = s.max
3609 closed = s.closed
3610 var fcReply string
3611 if !s.closed {
3612 s.delivered++
3613 delivered = s.delivered
3614 if s.jsi != nil {
3615 fcReply = s.checkForFlowControlResponse()
3616 }
3617 }
3618 s.mu.Unlock()
3619
3620 // Respond to flow control if applicable
3621 if fcReply != _EMPTY_ {
3622 nc.Publish(fcReply, nil)
3623 }
3624
3625 if closed {
3626 break
3627 }
3628

Callers 1

subscribeLockedMethod · 0.95

Calls 3

PublishMethod · 0.95
removeSubMethod · 0.95

Tested by

no test coverage detected