(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt)
| 1671 | } |
| 1672 | |
| 1673 | func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) { |
| 1674 | cfg := ConsumerConfig{ |
| 1675 | DeliverPolicy: deliverPolicyNotSet, |
| 1676 | AckPolicy: ackPolicyNotSet, |
| 1677 | ReplayPolicy: replayPolicyNotSet, |
| 1678 | } |
| 1679 | o := subOpts{cfg: &cfg} |
| 1680 | if len(opts) > 0 { |
| 1681 | for _, opt := range opts { |
| 1682 | if opt == nil { |
| 1683 | continue |
| 1684 | } |
| 1685 | if err := opt.configureSubscribe(&o); err != nil { |
| 1686 | return nil, err |
| 1687 | } |
| 1688 | } |
| 1689 | } |
| 1690 | |
| 1691 | // If no stream name is specified, the subject cannot be empty. |
| 1692 | if subj == _EMPTY_ && o.stream == _EMPTY_ { |
| 1693 | return nil, errors.New("nats: subject required") |
| 1694 | } |
| 1695 | |
| 1696 | // Note that these may change based on the consumer info response we may get. |
| 1697 | hasHeartbeats := o.cfg.Heartbeat > 0 |
| 1698 | hasFC := o.cfg.FlowControl |
| 1699 | |
| 1700 | // Some checks for pull subscribers |
| 1701 | if isPullMode { |
| 1702 | // No deliver subject should be provided |
| 1703 | if o.cfg.DeliverSubject != _EMPTY_ { |
| 1704 | return nil, ErrPullSubscribeToPushConsumer |
| 1705 | } |
| 1706 | } |
| 1707 | |
| 1708 | // Some check/setting specific to queue subs |
| 1709 | if queue != _EMPTY_ { |
| 1710 | // Queue subscriber cannot have HB or FC (since messages will be randomly dispatched |
| 1711 | // to members). We may in the future have a separate NATS subscription that all members |
| 1712 | // would subscribe to and server would send on. |
| 1713 | if o.cfg.Heartbeat > 0 || o.cfg.FlowControl { |
| 1714 | // Not making this a public ErrXXX in case we allow in the future. |
| 1715 | return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control") |
| 1716 | } |
| 1717 | |
| 1718 | // If this is a queue subscription and no consumer nor durable name was specified, |
| 1719 | // then we will use the queue name as a durable name. |
| 1720 | if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { |
| 1721 | if err := checkConsumerName(queue); err != nil { |
| 1722 | return nil, err |
| 1723 | } |
| 1724 | o.cfg.Durable = queue |
| 1725 | } |
| 1726 | } |
| 1727 | |
| 1728 | var ( |
| 1729 | err error |
| 1730 | shouldCreate bool |
no test coverage detected