MCPcopy
hub / github.com/nats-io/nats.go / handleStatusMsg

Method handleStatusMsg

jetstream/pull.go:720–751  ·  view source on GitHub ↗

handleStatusMsg processes a status message from the server. It returns a terminal error (caller should stop) and a non-terminal error to notify the user about via ErrHandler. The caller should invoke ErrHandler outside the lock to avoid deadlocks.

(msg *nats.Msg, msgErr error)

Source from the content-addressed store, hash-verified

718// error to notify the user about via ErrHandler. The caller should invoke
719// ErrHandler outside the lock to avoid deadlocks.
720func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) (error, error) {
721 if !errors.Is(msgErr, nats.ErrTimeout) && !errors.Is(msgErr, ErrMaxBytesExceeded) && !errors.Is(msgErr, ErrBatchCompleted) {
722 if errors.Is(msgErr, ErrConsumerDeleted) || errors.Is(msgErr, ErrBadRequest) {
723 return msgErr, nil
724 }
725 if errors.Is(msgErr, ErrPinIDMismatch) {
726 s.consumer.setPinID("")
727 s.pending.msgCount = 0
728 s.pending.byteCount = 0
729 }
730 if errors.Is(msgErr, ErrConsumerLeadershipChanged) {
731 s.pending.msgCount = 0
732 s.pending.byteCount = 0
733 }
734 return nil, msgErr
735 }
736 msgsLeft, bytesLeft, err := parsePending(msg)
737 if err != nil {
738 return err, nil
739 }
740 s.pending.msgCount -= msgsLeft
741 if s.pending.msgCount < 0 {
742 s.pending.msgCount = 0
743 }
744 if s.consumeOpts.MaxBytes > 0 && !s.consumeOpts.LimitSize {
745 s.pending.byteCount -= bytesLeft
746 if s.pending.byteCount < 0 {
747 s.pending.byteCount = 0
748 }
749 }
750 return nil, nil
751}
752
753func (hb *hbMonitor) Stop() {
754 hb.Lock()

Callers 2

ConsumeMethod · 0.95
NextMethod · 0.95

Calls 3

parsePendingFunction · 0.85
setPinIDMethod · 0.80
IsMethod · 0.45

Tested by

no test coverage detected