MCPcopy
hub / github.com/nats-io/nats.go / subscribe

Method subscribe

js.go:1673–2058  ·  view source on GitHub ↗
(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt)

Source from the content-addressed store, hash-verified

1671}
1672
1673func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
1674 cfg := ConsumerConfig{
1675 DeliverPolicy: deliverPolicyNotSet,
1676 AckPolicy: ackPolicyNotSet,
1677 ReplayPolicy: replayPolicyNotSet,
1678 }
1679 o := subOpts{cfg: &cfg}
1680 if len(opts) > 0 {
1681 for _, opt := range opts {
1682 if opt == nil {
1683 continue
1684 }
1685 if err := opt.configureSubscribe(&o); err != nil {
1686 return nil, err
1687 }
1688 }
1689 }
1690
1691 // If no stream name is specified, the subject cannot be empty.
1692 if subj == _EMPTY_ && o.stream == _EMPTY_ {
1693 return nil, errors.New("nats: subject required")
1694 }
1695
1696 // Note that these may change based on the consumer info response we may get.
1697 hasHeartbeats := o.cfg.Heartbeat > 0
1698 hasFC := o.cfg.FlowControl
1699
1700 // Some checks for pull subscribers
1701 if isPullMode {
1702 // No deliver subject should be provided
1703 if o.cfg.DeliverSubject != _EMPTY_ {
1704 return nil, ErrPullSubscribeToPushConsumer
1705 }
1706 }
1707
1708 // Some check/setting specific to queue subs
1709 if queue != _EMPTY_ {
1710 // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched
1711 // to members). We may in the future have a separate NATS subscription that all members
1712 // would subscribe to and server would send on.
1713 if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
1714 // Not making this a public ErrXXX in case we allow in the future.
1715 return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control")
1716 }
1717
1718 // If this is a queue subscription and no consumer nor durable name was specified,
1719 // then we will use the queue name as a durable name.
1720 if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
1721 if err := checkConsumerName(queue); err != nil {
1722 return nil, err
1723 }
1724 o.cfg.Durable = queue
1725 }
1726 }
1727
1728 var (
1729 err error
1730 shouldCreate bool

Callers 8

SubscribeMethod · 0.95
SubscribeSyncMethod · 0.95
QueueSubscribeMethod · 0.95
QueueSubscribeSyncMethod · 0.95
ChanSubscribeMethod · 0.95
ChanQueueSubscribeMethod · 0.95
PullSubscribeMethod · 0.95
bindRecvChanMethod · 0.45

Calls 15

StreamNameBySubjectMethod · 0.95
ConsumerInfoMethod · 0.95
apiSubjMethod · 0.95
upsertConsumerMethod · 0.95
checkConsumerNameFunction · 0.85
processConsInfoFunction · 0.85
getHashFunction · 0.85
ContextFunction · 0.85
NewInboxMethod · 0.80
UnsubscribeMethod · 0.80
SetPendingLimitsMethod · 0.80
TypeMethod · 0.80

Tested by

no test coverage detected