MCPcopy
hub / github.com/nats-io/nats.go / drainConnection

Method drainConnection

nats.go:6077–6173  ·  view source on GitHub ↗

drainConnection will run in a separate Go routine and will flush all publishes and drain all active subscriptions.

()

Source from the content-addressed store, hash-verified

6075// drainConnection will run in a separate Go routine and will
6076// flush all publishes and drain all active subscriptions.
6077func (nc *Conn) drainConnection() {
6078 // Snapshot subs list.
6079 nc.mu.Lock()
6080
6081 // Check again here if we are in a state to not process.
6082 if nc.isClosed() {
6083 nc.mu.Unlock()
6084 return
6085 }
6086 if nc.isConnecting() || nc.isReconnecting() {
6087 nc.mu.Unlock()
6088 // Move to closed state.
6089 nc.Close()
6090 return
6091 }
6092
6093 subs := make([]*Subscription, 0, len(nc.subs))
6094 for _, s := range nc.subs {
6095 if s == nc.respMux {
6096 // Skip since might be in use while messages
6097 // are being processed (can miss responses).
6098 continue
6099 }
6100 subs = append(subs, s)
6101 }
6102 errCB := nc.Opts.AsyncErrorCB
6103 drainWait := nc.Opts.DrainTimeout
6104 respMux := nc.respMux
6105 nc.mu.Unlock()
6106
6107 // for pushing errors with context.
6108 pushErr := func(err error) {
6109 nc.mu.Lock()
6110 nc.err = err
6111 if errCB != nil {
6112 nc.ach.push(func() { errCB(nc, nil, err) })
6113 }
6114 nc.mu.Unlock()
6115 }
6116
6117 // Do subs first, skip request handler if present.
6118 for _, s := range subs {
6119 if err := s.Drain(); err != nil {
6120 // We will notify about these but continue.
6121 pushErr(err)
6122 }
6123 }
6124
6125 // Wait for the subscriptions to drop to zero.
6126 timeout := time.Now().Add(drainWait)
6127 var min int
6128 if respMux != nil {
6129 min = 1
6130 } else {
6131 min = 0
6132 }
6133 for time.Now().Before(timeout) {
6134 if nc.NumSubscriptions() == min {

Callers 1

DrainMethod · 0.95

Calls 11

isClosedMethod · 0.95
isConnectingMethod · 0.95
isReconnectingMethod · 0.95
CloseMethod · 0.95
DrainMethod · 0.95
NumSubscriptionsMethod · 0.95
changeConnStatusMethod · 0.95
FlushTimeoutMethod · 0.95
AddMethod · 0.65
DrainMethod · 0.65
pushMethod · 0.45

Tested by

no test coverage detected