Fetch pulls a batch of messages from a stream for a pull consumer.
(batch int, opts ...PullOpt)
| 2966 | |
| 2967 | // Fetch pulls a batch of messages from a stream for a pull consumer. |
| 2968 | func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { |
| 2969 | if sub == nil { |
| 2970 | return nil, ErrBadSubscription |
| 2971 | } |
| 2972 | if batch < 1 { |
| 2973 | return nil, ErrInvalidArg |
| 2974 | } |
| 2975 | |
| 2976 | var o pullOpts |
| 2977 | for _, opt := range opts { |
| 2978 | if err := opt.configurePull(&o); err != nil { |
| 2979 | return nil, err |
| 2980 | } |
| 2981 | } |
| 2982 | if o.ctx != nil && o.ttl != 0 { |
| 2983 | return nil, ErrContextAndTimeout |
| 2984 | } |
| 2985 | |
| 2986 | sub.mu.Lock() |
| 2987 | jsi := sub.jsi |
| 2988 | // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription, |
| 2989 | // so check for jsi.pull boolean instead. |
| 2990 | if jsi == nil || !jsi.pull { |
| 2991 | sub.mu.Unlock() |
| 2992 | return nil, ErrTypeSubscription |
| 2993 | } |
| 2994 | |
| 2995 | nc := sub.conn |
| 2996 | nms := sub.jsi.nms |
| 2997 | rply, _ := newFetchInbox(jsi.deliver) |
| 2998 | js := sub.jsi.js |
| 2999 | pmc := len(sub.mch) > 0 |
| 3000 | |
| 3001 | // All fetch requests have an expiration, in case of no explicit expiration |
| 3002 | // then the default timeout of the JetStream context is used. |
| 3003 | ttl := o.ttl |
| 3004 | if ttl == 0 { |
| 3005 | ttl = js.opts.wait |
| 3006 | } |
| 3007 | sub.mu.Unlock() |
| 3008 | |
| 3009 | // Use the given context or setup a default one for the span |
| 3010 | // of the pull batch request. |
| 3011 | var ( |
| 3012 | ctx = o.ctx |
| 3013 | err error |
| 3014 | cancel context.CancelFunc |
| 3015 | ) |
| 3016 | if ctx == nil { |
| 3017 | ctx, cancel = context.WithTimeout(context.Background(), ttl) |
| 3018 | } else if _, hasDeadline := ctx.Deadline(); !hasDeadline { |
| 3019 | // Prevent from passing the background context which will just block |
| 3020 | // and cannot be canceled either. |
| 3021 | if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() { |
| 3022 | return nil, ErrNoDeadlineContext |
| 3023 | } |
| 3024 | |
| 3025 | // If the context did not have a deadline, then create a new child context |