MCPcopy
hub / github.com/nats-io/nats.go / doReconnect

Method doReconnect

nats.go:3147–3425  ·  view source on GitHub ↗

Try to reconnect using the option parameters. This function assumes we are allowed to reconnect.

(err error, forceReconnect bool)

Source from the content-addressed store, hash-verified

3145// Try to reconnect using the option parameters.
3146// This function assumes we are allowed to reconnect.
3147func (nc *Conn) doReconnect(err error, forceReconnect bool) {
3148 // We want to make sure we have the other watchers shutdown properly
3149 // here before we proceed past this point.
3150 nc.waitForExits()
3151
3152 // FIXME(dlc) - We have an issue here if we have
3153 // outstanding flush points (pongs) and they were not
3154 // sent out, but are still in the pipe.
3155
3156 // Hold the lock manually and release where needed below,
3157 // can't do defer here.
3158 nc.mu.Lock()
3159
3160 // Clear any errors.
3161 nc.err = nil
3162
3163 // Perform appropriate callback if needed for a disconnect.
3164 // DisconnectedErrCB has priority over deprecated DisconnectedCB
3165 if !nc.initc {
3166 if disconnectedErrCB := nc.Opts.DisconnectedErrCB; disconnectedErrCB != nil {
3167 nc.ach.push(func() { disconnectedErrCB(nc, err) })
3168 } else if disconnectedCB := nc.Opts.DisconnectedCB; disconnectedCB != nil {
3169 nc.ach.push(func() { disconnectedCB(nc) })
3170 }
3171 } else if nc.Opts.RetryOnFailedConnect && nc.initc && err != nil {
3172 // For initial connection failure with RetryOnFailedConnect,
3173 // report the error via ReconnectErrCB if available
3174 if nc.Opts.ReconnectErrCB != nil {
3175 nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) })
3176 }
3177 }
3178
3179 // This is used to wait on go routines exit if we start them in the loop
3180 // but an error occurs after that.
3181 waitForGoRoutines := false
3182 var rt *time.Timer
3183 // Channel used to kick routine out of sleep when conn is closed.
3184 rqch := nc.rqch
3185
3186 // if rqch is nil, we need to set it up to signal
3187 // the reconnect loop to reconnect immediately
3188 // this means that `ForceReconnect` was called
3189 // before entering doReconnect
3190 if rqch == nil {
3191 rqch = make(chan struct{})
3192 close(rqch)
3193 }
3194
3195 // Counter that is increased when the whole list of servers has been tried.
3196 var wlf int
3197
3198 var jitter time.Duration
3199 var rw time.Duration
3200 // If a custom reconnect delay handler is set, this takes precedence.
3201 crd := nc.Opts.CustomReconnectDelayCB
3202 if crd == nil {
3203 rw = nc.Opts.ReconnectWait
3204 // TODO: since we sleep only after the whole list has been tried, we can't

Callers 3

ForceReconnectMethod · 0.95
connectMethod · 0.95
processOpErrMethod · 0.95

Calls 15

waitForExitsMethod · 0.95
selectNextServerMethod · 0.95
isClosedMethod · 0.95
createConnMethod · 0.95
processConnectInitMethod · 0.95
changeConnStatusMethod · 0.95
resendSubscriptionsMethod · 0.95
stopPingTimerMethod · 0.95
FlushMethod · 0.95
closeMethod · 0.95
cloneMethod · 0.80

Tested by

no test coverage detected