We are here if we have detected a gap with an ordered consumer. We will create a new consumer and rewire the low level subscription. Lock should be held.
(sseq uint64)
| 2197 | // We will create a new consumer and rewire the low level subscription. |
| 2198 | // Lock should be held. |
| 2199 | func (sub *Subscription) resetOrderedConsumer(sseq uint64) { |
| 2200 | nc := sub.conn |
| 2201 | if sub.jsi == nil || nc == nil || sub.closed { |
| 2202 | return |
| 2203 | } |
| 2204 | |
| 2205 | var maxStr string |
| 2206 | // If there was an AUTO_UNSUB done, we need to adjust the new value |
| 2207 | // to send after the SUB for the new sid. |
| 2208 | if sub.max > 0 { |
| 2209 | if sub.jsi.fciseq < sub.max { |
| 2210 | adjustedMax := sub.max - sub.jsi.fciseq |
| 2211 | maxStr = strconv.Itoa(int(adjustedMax)) |
| 2212 | } else { |
| 2213 | // We are already at the max, so we should just unsub the |
| 2214 | // existing sub and be done |
| 2215 | go func(sid int64) { |
| 2216 | nc.mu.Lock() |
| 2217 | nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_)) |
| 2218 | nc.kickFlusher() |
| 2219 | nc.mu.Unlock() |
| 2220 | }(sub.sid) |
| 2221 | return |
| 2222 | } |
| 2223 | } |
| 2224 | |
| 2225 | // Quick unsubscribe. Since we know this is a simple push subscriber we do in place. |
| 2226 | osid := sub.applyNewSID() |
| 2227 | |
| 2228 | // Grab new inbox. |
| 2229 | newDeliver := nc.NewInbox() |
| 2230 | sub.Subject = newDeliver |
| 2231 | |
| 2232 | // Snapshot the new sid under sub lock. |
| 2233 | nsid := sub.sid |
| 2234 | |
| 2235 | // We are still in the low level readLoop for the connection so we need |
| 2236 | // to spin a go routine to try to create the new consumer. |
| 2237 | go func() { |
| 2238 | // Unsubscribe and subscribe with new inbox and sid. |
| 2239 | // Remap a new low level sub into this sub since its client accessible. |
| 2240 | // This is done here in this go routine to prevent lock inversion. |
| 2241 | nc.mu.Lock() |
| 2242 | nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_)) |
| 2243 | nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid)) |
| 2244 | if maxStr != _EMPTY_ { |
| 2245 | nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr)) |
| 2246 | } |
| 2247 | nc.kickFlusher() |
| 2248 | nc.mu.Unlock() |
| 2249 | |
| 2250 | pushErr := func(err error) { |
| 2251 | nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err)) |
| 2252 | nc.unsubscribe(sub, 0, true) |
| 2253 | } |
| 2254 | |
| 2255 | sub.mu.Lock() |
| 2256 | jsi := sub.jsi |
no test coverage detected