Check to make sure messages are arriving in order. Returns true if the sub had to be replaced. Will cause upper layers to return. The caller has verified that sub.jsi != nil and that this is not a control message. Lock should be held.
(m *Msg)
| 2151 | // The caller has verified that sub.jsi != nil and that this is not a control message. |
| 2152 | // Lock should be held. |
| 2153 | func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { |
| 2154 | // Ignore msgs with no reply like HBs and flow control, they are handled elsewhere. |
| 2155 | if m.Reply == _EMPTY_ { |
| 2156 | return false |
| 2157 | } |
| 2158 | |
| 2159 | // Normal message here. |
| 2160 | tokens, err := parser.GetMetadataFields(m.Reply) |
| 2161 | if err != nil { |
| 2162 | return false |
| 2163 | } |
| 2164 | sseq, dseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos]) |
| 2165 | |
| 2166 | jsi := sub.jsi |
| 2167 | if dseq != jsi.dseq { |
| 2168 | sub.resetOrderedConsumer(jsi.sseq + 1) |
| 2169 | return true |
| 2170 | } |
| 2171 | // Update our tracking here. |
| 2172 | jsi.dseq, jsi.sseq = dseq+1, sseq |
| 2173 | return false |
| 2174 | } |
| 2175 | |
| 2176 | // Update and replace sid. |
| 2177 | // Lock should be held on entry but will be unlocked to prevent lock inversion. |
no test coverage detected