MCPcopy
hub / github.com/nats-io/nats.go / processMsg

Method processMsg

nats.go:3670–3904  ·  view source on GitHub ↗

processMsg is called by parse and will place the msg on the appropriate channel/pending queue for processing. If the channel is full, or the pending queue is over the pending limits, the connection is considered a slow consumer.

(data []byte)

Source from the content-addressed store, hash-verified

3668// or the pending queue is over the pending limits, the connection is
3669// considered a slow consumer.
3670func (nc *Conn) processMsg(data []byte) {
3671 // Stats
3672 atomic.AddUint64(&nc.InMsgs, 1)
3673 atomic.AddUint64(&nc.InBytes, uint64(len(data)))
3674
3675 // Don't lock the connection to avoid server cutting us off if the
3676 // flusher is holding the connection lock, trying to send to the server
3677 // that is itself trying to send data to us.
3678 nc.subsMu.RLock()
3679 sub := nc.subs[nc.ps.ma.sid]
3680 var mf msgFilter
3681 if nc.filters != nil {
3682 mf = nc.filters[string(nc.ps.ma.subject)]
3683 }
3684 nc.subsMu.RUnlock()
3685
3686 if sub == nil {
3687 return
3688 }
3689
3690 // Copy them into string
3691 subj := string(nc.ps.ma.subject)
3692 reply := string(nc.ps.ma.reply)
3693
3694 // Doing message create outside of the sub's lock to reduce contention.
3695 // It's possible that we end-up not using the message, but that's ok.
3696
3697 // FIXME(dlc): Need to copy, should/can do COW?
3698 msgPayload := data
3699 if !nc.ps.msgCopied {
3700 msgPayload = make([]byte, len(data))
3701 copy(msgPayload, data)
3702 }
3703
3704 // Check if we have headers encoded here.
3705 var h Header
3706 var err error
3707 var ctrlMsg bool
3708 var ctrlType int
3709 var fcReply string
3710
3711 if nc.ps.ma.hdr > 0 {
3712 hbuf := msgPayload[:nc.ps.ma.hdr]
3713 msgPayload = msgPayload[nc.ps.ma.hdr:]
3714 h, err = DecodeHeadersMsg(hbuf)
3715 if err != nil {
3716 // We will pass the message through but send async error.
3717 nc.mu.Lock()
3718 nc.err = ErrBadHeaderMsg
3719 if asyncErrorCB := nc.Opts.AsyncErrorCB; asyncErrorCB != nil {
3720 nc.ach.push(func() { asyncErrorCB(nc, sub, ErrBadHeaderMsg) })
3721 }
3722 nc.mu.Unlock()
3723 }
3724 }
3725
3726 // FIXME(dlc): Should we recycle these containers?
3727 m := &Msg{

Callers 1

parseMethod · 0.95

Calls 13

PublishMethod · 0.95
removeSubMethod · 0.95
DecodeHeadersMsgFunction · 0.85
isJSControlMessageFunction · 0.85
checkOrderedMsgsMethod · 0.80
trackSequencesMethod · 0.80
getJSDeliveredMethod · 0.80
changeSubStatusMethod · 0.80
GetMethod · 0.65

Tested by

no test coverage detected