checkForSequenceMismatch will make sure we have not missed any messages since last seen.
(msg *Msg, s *Subscription, jsi *jsSub)
| 2404 | |
| 2405 | // checkForSequenceMismatch will make sure we have not missed any messages since last seen. |
| 2406 | func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { |
| 2407 | // Process heartbeat received, get latest control metadata if present. |
| 2408 | s.mu.Lock() |
| 2409 | ctrl, ordered := jsi.cmeta, jsi.ordered |
| 2410 | jsi.active = true |
| 2411 | s.mu.Unlock() |
| 2412 | |
| 2413 | if ctrl == _EMPTY_ { |
| 2414 | return |
| 2415 | } |
| 2416 | |
| 2417 | tokens, err := parser.GetMetadataFields(ctrl) |
| 2418 | if err != nil { |
| 2419 | return |
| 2420 | } |
| 2421 | |
| 2422 | // Consumer sequence. |
| 2423 | var ldseq string |
| 2424 | dseq := tokens[parser.AckConsumerSeqTokenPos] |
| 2425 | hdr := msg.Header[lastConsumerSeqHdr] |
| 2426 | if len(hdr) == 1 { |
| 2427 | ldseq = hdr[0] |
| 2428 | } |
| 2429 | |
| 2430 | // Detect consumer sequence mismatch and whether |
| 2431 | // should restart the consumer. |
| 2432 | if ldseq != dseq { |
| 2433 | // Dispatch async error including details such as |
| 2434 | // from where the consumer could be restarted. |
| 2435 | sseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]) |
| 2436 | if ordered { |
| 2437 | s.mu.Lock() |
| 2438 | s.resetOrderedConsumer(jsi.sseq + 1) |
| 2439 | s.mu.Unlock() |
| 2440 | } else { |
| 2441 | ecs := &ErrConsumerSequenceMismatch{ |
| 2442 | StreamResumeSequence: uint64(sseq), |
| 2443 | ConsumerSequence: parser.ParseNum(dseq), |
| 2444 | LastConsumerSequence: parser.ParseNum(ldseq), |
| 2445 | } |
| 2446 | nc.handleConsumerSequenceMismatch(s, ecs) |
| 2447 | } |
| 2448 | } |
| 2449 | } |
| 2450 | |
| 2451 | type streamRequest struct { |
| 2452 | Subject string `json:"subject,omitempty"` |
no test coverage detected