FetchBatch pulls a batch of messages from a stream for a pull consumer. Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch], allowing to retrieve incoming messages from a channel. The returned channel is always closed after all messages for a batch have been delivered by the s
(batch int, opts ...PullOpt)
| 3264 | // This method will not return error in case of pull request expiry (even if there are no messages). |
| 3265 | // Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages. |
| 3266 | func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) { |
| 3267 | if sub == nil { |
| 3268 | return nil, ErrBadSubscription |
| 3269 | } |
| 3270 | if batch < 1 { |
| 3271 | return nil, ErrInvalidArg |
| 3272 | } |
| 3273 | |
| 3274 | var o pullOpts |
| 3275 | for _, opt := range opts { |
| 3276 | if err := opt.configurePull(&o); err != nil { |
| 3277 | return nil, err |
| 3278 | } |
| 3279 | } |
| 3280 | if o.ctx != nil && o.ttl != 0 { |
| 3281 | return nil, ErrContextAndTimeout |
| 3282 | } |
| 3283 | sub.mu.Lock() |
| 3284 | jsi := sub.jsi |
| 3285 | // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription, |
| 3286 | // so check for jsi.pull boolean instead. |
| 3287 | if jsi == nil || !jsi.pull { |
| 3288 | sub.mu.Unlock() |
| 3289 | return nil, ErrTypeSubscription |
| 3290 | } |
| 3291 | |
| 3292 | nc := sub.conn |
| 3293 | nms := sub.jsi.nms |
| 3294 | rply, reqID := newFetchInbox(sub.jsi.deliver) |
| 3295 | js := sub.jsi.js |
| 3296 | pmc := len(sub.mch) > 0 |
| 3297 | |
| 3298 | // All fetch requests have an expiration, in case of no explicit expiration |
| 3299 | // then the default timeout of the JetStream context is used. |
| 3300 | ttl := o.ttl |
| 3301 | if ttl == 0 { |
| 3302 | ttl = js.opts.wait |
| 3303 | } |
| 3304 | sub.mu.Unlock() |
| 3305 | |
| 3306 | // Use the given context or setup a default one for the span |
| 3307 | // of the pull batch request. |
| 3308 | var ( |
| 3309 | ctx = o.ctx |
| 3310 | cancel context.CancelFunc |
| 3311 | cancelContext = true |
| 3312 | ) |
| 3313 | if ctx == nil { |
| 3314 | ctx, cancel = context.WithTimeout(context.Background(), ttl) |
| 3315 | } else if _, hasDeadline := ctx.Deadline(); !hasDeadline { |
| 3316 | // Prevent from passing the background context which will just block |
| 3317 | // and cannot be canceled either. |
| 3318 | if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() { |
| 3319 | return nil, ErrNoDeadlineContext |
| 3320 | } |
| 3321 | |
| 3322 | // If the context did not have a deadline, then create a new child context |
| 3323 | // that will use the default timeout from the JS context. |