(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub)
| 4891 | } |
| 4892 | |
| 4893 | func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) { |
| 4894 | if nc == nil { |
| 4895 | return nil, ErrInvalidConnection |
| 4896 | } |
| 4897 | if badSubject(subj) { |
| 4898 | return nil, ErrBadSubject |
| 4899 | } |
| 4900 | if queue != _EMPTY_ && badQueue(queue) { |
| 4901 | return nil, ErrBadQueueName |
| 4902 | } |
| 4903 | |
| 4904 | // Check for some error conditions. |
| 4905 | if nc.isClosed() { |
| 4906 | return nil, ErrConnectionClosed |
| 4907 | } |
| 4908 | if nc.isDraining() { |
| 4909 | return nil, ErrConnectionDraining |
| 4910 | } |
| 4911 | |
| 4912 | if cb == nil && ch == nil { |
| 4913 | return nil, ErrBadSubscription |
| 4914 | } |
| 4915 | |
| 4916 | sub := &Subscription{ |
| 4917 | Subject: subj, |
| 4918 | Queue: queue, |
| 4919 | mcb: cb, |
| 4920 | conn: nc, |
| 4921 | jsi: js, |
| 4922 | } |
| 4923 | // Set pending limits. |
| 4924 | if ch != nil { |
| 4925 | sub.pMsgsLimit = cap(ch) |
| 4926 | } else { |
| 4927 | sub.pMsgsLimit = DefaultSubPendingMsgsLimit |
| 4928 | } |
| 4929 | sub.pBytesLimit = DefaultSubPendingBytesLimit |
| 4930 | |
| 4931 | // If we have an async callback, start up a sub specific |
| 4932 | // Go routine to deliver the messages. |
| 4933 | var sr bool |
| 4934 | if cb != nil { |
| 4935 | sub.typ = AsyncSubscription |
| 4936 | sub.pCond = sync.NewCond(&sub.mu) |
| 4937 | sr = true |
| 4938 | } else if !isSync { |
| 4939 | sub.typ = ChanSubscription |
| 4940 | sub.mch = ch |
| 4941 | } else { // Sync Subscription |
| 4942 | sub.typ = SyncSubscription |
| 4943 | sub.mch = ch |
| 4944 | sub.errCh = errCh |
| 4945 | } |
| 4946 | |
| 4947 | nc.subsMu.Lock() |
| 4948 | nc.ssid++ |
| 4949 | sub.sid = nc.ssid |
| 4950 | nc.subs[sub.sid] = sub |
no test coverage detected