MCPcopy
hub / github.com/nats-io/nats.go / subscribeLocked

Method subscribeLocked

nats.go:4893–4967  ·  view source on GitHub ↗
(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub)

Source from the content-addressed store, hash-verified

4891}
4892
4893func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
4894 if nc == nil {
4895 return nil, ErrInvalidConnection
4896 }
4897 if badSubject(subj) {
4898 return nil, ErrBadSubject
4899 }
4900 if queue != _EMPTY_ && badQueue(queue) {
4901 return nil, ErrBadQueueName
4902 }
4903
4904 // Check for some error conditions.
4905 if nc.isClosed() {
4906 return nil, ErrConnectionClosed
4907 }
4908 if nc.isDraining() {
4909 return nil, ErrConnectionDraining
4910 }
4911
4912 if cb == nil && ch == nil {
4913 return nil, ErrBadSubscription
4914 }
4915
4916 sub := &Subscription{
4917 Subject: subj,
4918 Queue: queue,
4919 mcb: cb,
4920 conn: nc,
4921 jsi: js,
4922 }
4923 // Set pending limits.
4924 if ch != nil {
4925 sub.pMsgsLimit = cap(ch)
4926 } else {
4927 sub.pMsgsLimit = DefaultSubPendingMsgsLimit
4928 }
4929 sub.pBytesLimit = DefaultSubPendingBytesLimit
4930
4931 // If we have an async callback, start up a sub specific
4932 // Go routine to deliver the messages.
4933 var sr bool
4934 if cb != nil {
4935 sub.typ = AsyncSubscription
4936 sub.pCond = sync.NewCond(&sub.mu)
4937 sr = true
4938 } else if !isSync {
4939 sub.typ = ChanSubscription
4940 sub.mch = ch
4941 } else { // Sync Subscription
4942 sub.typ = SyncSubscription
4943 sub.mch = ch
4944 sub.errCh = errCh
4945 }
4946
4947 nc.subsMu.Lock()
4948 nc.ssid++
4949 sub.sid = nc.ssid
4950 nc.subs[sub.sid] = sub

Callers 2

subscribeMethod · 0.95

Calls 9

isClosedMethod · 0.95
isDrainingMethod · 0.95
waitForMsgsMethod · 0.95
isReconnectingMethod · 0.95
kickFlusherMethod · 0.95
changeSubStatusMethod · 0.95
badSubjectFunction · 0.85
badQueueFunction · 0.85
appendStringMethod · 0.80

Tested by

no test coverage detected