MCPcopy
hub / github.com/nats-io/nats.go / checkForSequenceMismatch

Method checkForSequenceMismatch

js.go:2406–2449  ·  view source on GitHub ↗

checkForSequenceMismatch will make sure we have not missed any messages since last seen.

(msg *Msg, s *Subscription, jsi *jsSub)

Source from the content-addressed store, hash-verified

2404
2405// checkForSequenceMismatch will make sure we have not missed any messages since last seen.
2406func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
2407 // Process heartbeat received, get latest control metadata if present.
2408 s.mu.Lock()
2409 ctrl, ordered := jsi.cmeta, jsi.ordered
2410 jsi.active = true
2411 s.mu.Unlock()
2412
2413 if ctrl == _EMPTY_ {
2414 return
2415 }
2416
2417 tokens, err := parser.GetMetadataFields(ctrl)
2418 if err != nil {
2419 return
2420 }
2421
2422 // Consumer sequence.
2423 var ldseq string
2424 dseq := tokens[parser.AckConsumerSeqTokenPos]
2425 hdr := msg.Header[lastConsumerSeqHdr]
2426 if len(hdr) == 1 {
2427 ldseq = hdr[0]
2428 }
2429
2430 // Detect consumer sequence mismatch and whether
2431 // should restart the consumer.
2432 if ldseq != dseq {
2433 // Dispatch async error including details such as
2434 // from where the consumer could be restarted.
2435 sseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos])
2436 if ordered {
2437 s.mu.Lock()
2438 s.resetOrderedConsumer(jsi.sseq + 1)
2439 s.mu.Unlock()
2440 } else {
2441 ecs := &ErrConsumerSequenceMismatch{
2442 StreamResumeSequence: uint64(sseq),
2443 ConsumerSequence: parser.ParseNum(dseq),
2444 LastConsumerSequence: parser.ParseNum(ldseq),
2445 }
2446 nc.handleConsumerSequenceMismatch(s, ecs)
2447 }
2448 }
2449}
2450
2451type streamRequest struct {
2452 Subject string `json:"subject,omitempty"`

Callers 1

processMsgMethod · 0.95

Calls 4

GetMetadataFieldsFunction · 0.92
ParseNumFunction · 0.92
resetOrderedConsumerMethod · 0.80

Tested by

no test coverage detected