MCPcopy
hub / github.com/nats-io/nats.go / resetOrderedConsumer

Method resetOrderedConsumer

js.go:2199–2312  ·  view source on GitHub ↗

We are here if we have detected a gap with an ordered consumer. We will create a new consumer and rewire the low level subscription. Lock should be held.

(sseq uint64)

Source from the content-addressed store, hash-verified

2197// We will create a new consumer and rewire the low level subscription.
2198// Lock should be held.
2199func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
2200 nc := sub.conn
2201 if sub.jsi == nil || nc == nil || sub.closed {
2202 return
2203 }
2204
2205 var maxStr string
2206 // If there was an AUTO_UNSUB done, we need to adjust the new value
2207 // to send after the SUB for the new sid.
2208 if sub.max > 0 {
2209 if sub.jsi.fciseq < sub.max {
2210 adjustedMax := sub.max - sub.jsi.fciseq
2211 maxStr = strconv.Itoa(int(adjustedMax))
2212 } else {
2213 // We are already at the max, so we should just unsub the
2214 // existing sub and be done
2215 go func(sid int64) {
2216 nc.mu.Lock()
2217 nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
2218 nc.kickFlusher()
2219 nc.mu.Unlock()
2220 }(sub.sid)
2221 return
2222 }
2223 }
2224
2225 // Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
2226 osid := sub.applyNewSID()
2227
2228 // Grab new inbox.
2229 newDeliver := nc.NewInbox()
2230 sub.Subject = newDeliver
2231
2232 // Snapshot the new sid under sub lock.
2233 nsid := sub.sid
2234
2235 // We are still in the low level readLoop for the connection so we need
2236 // to spin a go routine to try to create the new consumer.
2237 go func() {
2238 // Unsubscribe and subscribe with new inbox and sid.
2239 // Remap a new low level sub into this sub since its client accessible.
2240 // This is done here in this go routine to prevent lock inversion.
2241 nc.mu.Lock()
2242 nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
2243 nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
2244 if maxStr != _EMPTY_ {
2245 nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
2246 }
2247 nc.kickFlusher()
2248 nc.mu.Unlock()
2249
2250 pushErr := func(err error) {
2251 nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
2252 nc.unsubscribe(sub, 0, true)
2253 }
2254
2255 sub.mu.Lock()
2256 jsi := sub.jsi

Callers 3

checkOrderedMsgsMethod · 0.95
activityCheckMethod · 0.95

Calls 13

applyNewSIDMethod · 0.95
getHashFunction · 0.85
ContextFunction · 0.85
appendStringMethod · 0.80
kickFlusherMethod · 0.80
NewInboxMethod · 0.80
ErrorfMethod · 0.80
unsubscribeMethod · 0.80
upsertConsumerMethod · 0.80
DeleteConsumerMethod · 0.65
NextMethod · 0.65

Tested by

no test coverage detected