handleStatusMsg processes a status message from the server. It returns a terminal error (caller should stop) and a non-terminal error to notify the user about via ErrHandler. The caller should invoke ErrHandler outside the lock to avoid deadlocks.
(msg *nats.Msg, msgErr error)
| 718 | // error to notify the user about via ErrHandler. The caller should invoke |
| 719 | // ErrHandler outside the lock to avoid deadlocks. |
| 720 | func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) (error, error) { |
| 721 | if !errors.Is(msgErr, nats.ErrTimeout) && !errors.Is(msgErr, ErrMaxBytesExceeded) && !errors.Is(msgErr, ErrBatchCompleted) { |
| 722 | if errors.Is(msgErr, ErrConsumerDeleted) || errors.Is(msgErr, ErrBadRequest) { |
| 723 | return msgErr, nil |
| 724 | } |
| 725 | if errors.Is(msgErr, ErrPinIDMismatch) { |
| 726 | s.consumer.setPinID("") |
| 727 | s.pending.msgCount = 0 |
| 728 | s.pending.byteCount = 0 |
| 729 | } |
| 730 | if errors.Is(msgErr, ErrConsumerLeadershipChanged) { |
| 731 | s.pending.msgCount = 0 |
| 732 | s.pending.byteCount = 0 |
| 733 | } |
| 734 | return nil, msgErr |
| 735 | } |
| 736 | msgsLeft, bytesLeft, err := parsePending(msg) |
| 737 | if err != nil { |
| 738 | return err, nil |
| 739 | } |
| 740 | s.pending.msgCount -= msgsLeft |
| 741 | if s.pending.msgCount < 0 { |
| 742 | s.pending.msgCount = 0 |
| 743 | } |
| 744 | if s.consumeOpts.MaxBytes > 0 && !s.consumeOpts.LimitSize { |
| 745 | s.pending.byteCount -= bytesLeft |
| 746 | if s.pending.byteCount < 0 { |
| 747 | s.pending.byteCount = 0 |
| 748 | } |
| 749 | } |
| 750 | return nil, nil |
| 751 | } |
| 752 | |
| 753 | func (hb *hbMonitor) Stop() { |
| 754 | hb.Lock() |
no test coverage detected