processMsg is called by parse and will place the msg on the appropriate channel/pending queue for processing. If the channel is full, or the pending queue is over the pending limits, the connection is considered a slow consumer.
(data []byte)
| 3668 | // or the pending queue is over the pending limits, the connection is |
| 3669 | // considered a slow consumer. |
| 3670 | func (nc *Conn) processMsg(data []byte) { |
| 3671 | // Stats |
| 3672 | atomic.AddUint64(&nc.InMsgs, 1) |
| 3673 | atomic.AddUint64(&nc.InBytes, uint64(len(data))) |
| 3674 | |
| 3675 | // Don't lock the connection to avoid server cutting us off if the |
| 3676 | // flusher is holding the connection lock, trying to send to the server |
| 3677 | // that is itself trying to send data to us. |
| 3678 | nc.subsMu.RLock() |
| 3679 | sub := nc.subs[nc.ps.ma.sid] |
| 3680 | var mf msgFilter |
| 3681 | if nc.filters != nil { |
| 3682 | mf = nc.filters[string(nc.ps.ma.subject)] |
| 3683 | } |
| 3684 | nc.subsMu.RUnlock() |
| 3685 | |
| 3686 | if sub == nil { |
| 3687 | return |
| 3688 | } |
| 3689 | |
| 3690 | // Copy them into string |
| 3691 | subj := string(nc.ps.ma.subject) |
| 3692 | reply := string(nc.ps.ma.reply) |
| 3693 | |
| 3694 | // Doing message create outside of the sub's lock to reduce contention. |
| 3695 | // It's possible that we end-up not using the message, but that's ok. |
| 3696 | |
| 3697 | // FIXME(dlc): Need to copy, should/can do COW? |
| 3698 | msgPayload := data |
| 3699 | if !nc.ps.msgCopied { |
| 3700 | msgPayload = make([]byte, len(data)) |
| 3701 | copy(msgPayload, data) |
| 3702 | } |
| 3703 | |
| 3704 | // Check if we have headers encoded here. |
| 3705 | var h Header |
| 3706 | var err error |
| 3707 | var ctrlMsg bool |
| 3708 | var ctrlType int |
| 3709 | var fcReply string |
| 3710 | |
| 3711 | if nc.ps.ma.hdr > 0 { |
| 3712 | hbuf := msgPayload[:nc.ps.ma.hdr] |
| 3713 | msgPayload = msgPayload[nc.ps.ma.hdr:] |
| 3714 | h, err = DecodeHeadersMsg(hbuf) |
| 3715 | if err != nil { |
| 3716 | // We will pass the message through but send async error. |
| 3717 | nc.mu.Lock() |
| 3718 | nc.err = ErrBadHeaderMsg |
| 3719 | if asyncErrorCB := nc.Opts.AsyncErrorCB; asyncErrorCB != nil { |
| 3720 | nc.ach.push(func() { asyncErrorCB(nc, sub, ErrBadHeaderMsg) }) |
| 3721 | } |
| 3722 | nc.mu.Unlock() |
| 3723 | } |
| 3724 | } |
| 3725 | |
| 3726 | // FIXME(dlc): Should we recycle these containers? |
| 3727 | m := &Msg{ |
no test coverage detected