MCPcopy
hub / github.com/nats-io/nats.go / FetchBatch

Method FetchBatch

js.go:3266–3497  ·  view source on GitHub ↗

FetchBatch pulls a batch of messages from a stream for a pull consumer. Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch], allowing to retrieve incoming messages from a channel. The returned channel is always closed after all messages for a batch have been delivered by the s

(batch int, opts ...PullOpt)

Source from the content-addressed store, hash-verified

3264// This method will not return error in case of pull request expiry (even if there are no messages).
3265// Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
3266func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) {
3267 if sub == nil {
3268 return nil, ErrBadSubscription
3269 }
3270 if batch < 1 {
3271 return nil, ErrInvalidArg
3272 }
3273
3274 var o pullOpts
3275 for _, opt := range opts {
3276 if err := opt.configurePull(&o); err != nil {
3277 return nil, err
3278 }
3279 }
3280 if o.ctx != nil && o.ttl != 0 {
3281 return nil, ErrContextAndTimeout
3282 }
3283 sub.mu.Lock()
3284 jsi := sub.jsi
3285 // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
3286 // so check for jsi.pull boolean instead.
3287 if jsi == nil || !jsi.pull {
3288 sub.mu.Unlock()
3289 return nil, ErrTypeSubscription
3290 }
3291
3292 nc := sub.conn
3293 nms := sub.jsi.nms
3294 rply, reqID := newFetchInbox(sub.jsi.deliver)
3295 js := sub.jsi.js
3296 pmc := len(sub.mch) > 0
3297
3298 // All fetch requests have an expiration, in case of no explicit expiration
3299 // then the default timeout of the JetStream context is used.
3300 ttl := o.ttl
3301 if ttl == 0 {
3302 ttl = js.opts.wait
3303 }
3304 sub.mu.Unlock()
3305
3306 // Use the given context or setup a default one for the span
3307 // of the pull batch request.
3308 var (
3309 ctx = o.ctx
3310 cancel context.CancelFunc
3311 cancelContext = true
3312 )
3313 if ctx == nil {
3314 ctx, cancel = context.WithTimeout(context.Background(), ttl)
3315 } else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
3316 // Prevent from passing the background context which will just block
3317 // and cannot be canceled either.
3318 if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
3319 return nil, ErrNoDeadlineContext
3320 }
3321
3322 // If the context did not have a deadline, then create a new child context
3323 // that will use the default timeout from the JS context.

Calls 15

nextMsgWithContextMethod · 0.95
checkCtxErrMethod · 0.95
newFetchInboxFunction · 0.85
subjectMatchesReqIDFunction · 0.85
ErrorfMethod · 0.80
DurationMethod · 0.80
StoreMethod · 0.80
RemoveStatusListenerMethod · 0.80
LoadMethod · 0.80
checkMsgFunction · 0.70
configurePullMethod · 0.65
DoneMethod · 0.65