MCPcopy
hub / github.com/nats-io/nats.go / resendSubscriptions

Method resendSubscriptions

nats.go:5872–5909  ·  view source on GitHub ↗

resendSubscriptions will send our subscription state back to the server. Used in reconnects

()

Source from the content-addressed store, hash-verified

5870// resendSubscriptions will send our subscription state back to the
5871// server. Used in reconnects
5872func (nc *Conn) resendSubscriptions() {
5873 // Since we are going to send protocols to the server, we don't want to
5874 // be holding the subsMu lock (which is used in processMsg). So copy
5875 // the subscriptions in a temporary array.
5876 nc.subsMu.RLock()
5877 subs := make([]*Subscription, 0, len(nc.subs))
5878 for _, s := range nc.subs {
5879 subs = append(subs, s)
5880 }
5881 nc.subsMu.RUnlock()
5882 for _, s := range subs {
5883 adjustedMax := uint64(0)
5884 s.mu.Lock()
5885 // when resending subscriptions, the permissions error should be cleared
5886 // since the user may have fixed the permissions issue
5887 s.permissionsErr = nil
5888 if s.max > 0 {
5889 if s.delivered < s.max {
5890 adjustedMax = s.max - s.delivered
5891 }
5892 // adjustedMax could be 0 here if the number of delivered msgs
5893 // reached the max, if so unsubscribe.
5894 if adjustedMax == 0 {
5895 s.mu.Unlock()
5896 nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, _EMPTY_))
5897 continue
5898 }
5899 }
5900 subj, queue, sid := s.Subject, s.Queue, s.sid
5901 s.mu.Unlock()
5902
5903 nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid))
5904 if adjustedMax > 0 {
5905 maxStr := strconv.Itoa(int(adjustedMax))
5906 nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr))
5907 }
5908 }
5909}
5910
5911// This will clear any pending flush calls and release pending calls.
5912// Lock is assumed to be held by the caller.

Callers 1

doReconnectMethod · 0.95

Calls 1

writeDirectMethod · 0.80

Tested by

no test coverage detected