Checks for activity from our consumer. If we do not think we are active send an async error.
()
| 2346 | // Checks for activity from our consumer. |
| 2347 | // If we do not think we are active send an async error. |
| 2348 | func (sub *Subscription) activityCheck() { |
| 2349 | sub.mu.Lock() |
| 2350 | jsi := sub.jsi |
| 2351 | if jsi == nil || sub.closed { |
| 2352 | sub.mu.Unlock() |
| 2353 | return |
| 2354 | } |
| 2355 | |
| 2356 | active := jsi.active |
| 2357 | jsi.hbc.Reset(jsi.hbi * hbcThresh) |
| 2358 | jsi.active = false |
| 2359 | nc := sub.conn |
| 2360 | sub.mu.Unlock() |
| 2361 | |
| 2362 | if !active { |
| 2363 | if !jsi.ordered || nc.Status() != CONNECTED { |
| 2364 | nc.mu.Lock() |
| 2365 | if errCB := nc.Opts.AsyncErrorCB; errCB != nil { |
| 2366 | nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) }) |
| 2367 | } |
| 2368 | nc.mu.Unlock() |
| 2369 | return |
| 2370 | } |
| 2371 | sub.mu.Lock() |
| 2372 | sub.resetOrderedConsumer(jsi.sseq + 1) |
| 2373 | sub.mu.Unlock() |
| 2374 | } |
| 2375 | } |
| 2376 | |
| 2377 | // scheduleHeartbeatCheck sets up the timer check to make sure we are active |
| 2378 | // or receiving idle heartbeats.. |
nothing calls this directly
no test coverage detected