resendSubscriptions will send our subscription state back to the server. Used in reconnects
()
| 5870 | // resendSubscriptions will send our subscription state back to the |
| 5871 | // server. Used in reconnects |
| 5872 | func (nc *Conn) resendSubscriptions() { |
| 5873 | // Since we are going to send protocols to the server, we don't want to |
| 5874 | // be holding the subsMu lock (which is used in processMsg). So copy |
| 5875 | // the subscriptions in a temporary array. |
| 5876 | nc.subsMu.RLock() |
| 5877 | subs := make([]*Subscription, 0, len(nc.subs)) |
| 5878 | for _, s := range nc.subs { |
| 5879 | subs = append(subs, s) |
| 5880 | } |
| 5881 | nc.subsMu.RUnlock() |
| 5882 | for _, s := range subs { |
| 5883 | adjustedMax := uint64(0) |
| 5884 | s.mu.Lock() |
| 5885 | // when resending subscriptions, the permissions error should be cleared |
| 5886 | // since the user may have fixed the permissions issue |
| 5887 | s.permissionsErr = nil |
| 5888 | if s.max > 0 { |
| 5889 | if s.delivered < s.max { |
| 5890 | adjustedMax = s.max - s.delivered |
| 5891 | } |
| 5892 | // adjustedMax could be 0 here if the number of delivered msgs |
| 5893 | // reached the max, if so unsubscribe. |
| 5894 | if adjustedMax == 0 { |
| 5895 | s.mu.Unlock() |
| 5896 | nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, _EMPTY_)) |
| 5897 | continue |
| 5898 | } |
| 5899 | } |
| 5900 | subj, queue, sid := s.Subject, s.Queue, s.sid |
| 5901 | s.mu.Unlock() |
| 5902 | |
| 5903 | nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid)) |
| 5904 | if adjustedMax > 0 { |
| 5905 | maxStr := strconv.Itoa(int(adjustedMax)) |
| 5906 | nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr)) |
| 5907 | } |
| 5908 | } |
| 5909 | } |
| 5910 | |
| 5911 | // This will clear any pending flush calls and release pending calls. |
| 5912 | // Lock is assumed to be held by the caller. |
no test coverage detected