waitForMsgs waits on the conditional shared with readLoop and processMsg. It is used to deliver messages to asynchronous subscribers.
(s *Subscription)
| 3569 | // waitForMsgs waits on the conditional shared with readLoop and processMsg. |
| 3570 | // It is used to deliver messages to asynchronous subscribers. |
| 3571 | func (nc *Conn) waitForMsgs(s *Subscription) { |
| 3572 | var closed bool |
| 3573 | var delivered, max uint64 |
| 3574 | |
| 3575 | // Used to account for adjustments to sub.pBytes when we wrap back around. |
| 3576 | msgLen := -1 |
| 3577 | |
| 3578 | for { |
| 3579 | s.mu.Lock() |
| 3580 | // Do accounting for last msg delivered here so we only lock once |
| 3581 | // and drain state trips after callback has returned. |
| 3582 | if msgLen >= 0 { |
| 3583 | s.pMsgs-- |
| 3584 | s.pBytes -= msgLen |
| 3585 | msgLen = -1 |
| 3586 | } |
| 3587 | |
| 3588 | if s.pHead == nil && !s.closed { |
| 3589 | s.pCond.Wait() |
| 3590 | } |
| 3591 | // Pop the msg off the list |
| 3592 | m := s.pHead |
| 3593 | if m != nil { |
| 3594 | s.pHead = m.next |
| 3595 | if s.pHead == nil { |
| 3596 | s.pTail = nil |
| 3597 | } |
| 3598 | if m.barrier != nil { |
| 3599 | s.mu.Unlock() |
| 3600 | if atomic.AddInt64(&m.barrier.refs, -1) == 0 { |
| 3601 | m.barrier.f() |
| 3602 | } |
| 3603 | continue |
| 3604 | } |
| 3605 | msgLen = len(m.Data) |
| 3606 | } |
| 3607 | mcb := s.mcb |
| 3608 | max = s.max |
| 3609 | closed = s.closed |
| 3610 | var fcReply string |
| 3611 | if !s.closed { |
| 3612 | s.delivered++ |
| 3613 | delivered = s.delivered |
| 3614 | if s.jsi != nil { |
| 3615 | fcReply = s.checkForFlowControlResponse() |
| 3616 | } |
| 3617 | } |
| 3618 | s.mu.Unlock() |
| 3619 | |
| 3620 | // Respond to flow control if applicable |
| 3621 | if fcReply != _EMPTY_ { |
| 3622 | nc.Publish(fcReply, nil) |
| 3623 | } |
| 3624 | |
| 3625 | if closed { |
| 3626 | break |
| 3627 | } |
| 3628 |
no test coverage detected