MCPcopy
hub / github.com/nats-io/nats.go / Fetch

Method Fetch

js.go:2968–3194  ·  view source on GitHub ↗

Fetch pulls a batch of messages from a stream for a pull consumer.

(batch int, opts ...PullOpt)

Source from the content-addressed store, hash-verified

2966
2967// Fetch pulls a batch of messages from a stream for a pull consumer.
2968func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
2969 if sub == nil {
2970 return nil, ErrBadSubscription
2971 }
2972 if batch < 1 {
2973 return nil, ErrInvalidArg
2974 }
2975
2976 var o pullOpts
2977 for _, opt := range opts {
2978 if err := opt.configurePull(&o); err != nil {
2979 return nil, err
2980 }
2981 }
2982 if o.ctx != nil && o.ttl != 0 {
2983 return nil, ErrContextAndTimeout
2984 }
2985
2986 sub.mu.Lock()
2987 jsi := sub.jsi
2988 // Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
2989 // so check for jsi.pull boolean instead.
2990 if jsi == nil || !jsi.pull {
2991 sub.mu.Unlock()
2992 return nil, ErrTypeSubscription
2993 }
2994
2995 nc := sub.conn
2996 nms := sub.jsi.nms
2997 rply, _ := newFetchInbox(jsi.deliver)
2998 js := sub.jsi.js
2999 pmc := len(sub.mch) > 0
3000
3001 // All fetch requests have an expiration, in case of no explicit expiration
3002 // then the default timeout of the JetStream context is used.
3003 ttl := o.ttl
3004 if ttl == 0 {
3005 ttl = js.opts.wait
3006 }
3007 sub.mu.Unlock()
3008
3009 // Use the given context or setup a default one for the span
3010 // of the pull batch request.
3011 var (
3012 ctx = o.ctx
3013 err error
3014 cancel context.CancelFunc
3015 )
3016 if ctx == nil {
3017 ctx, cancel = context.WithTimeout(context.Background(), ttl)
3018 } else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
3019 // Prevent from passing the background context which will just block
3020 // and cannot be canceled either.
3021 if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
3022 return nil, ErrNoDeadlineContext
3023 }
3024
3025 // If the context did not have a deadline, then create a new child context

Calls 15

nextMsgWithContextMethod · 0.95
checkCtxErrMethod · 0.95
newFetchInboxFunction · 0.85
ErrorfMethod · 0.80
DurationMethod · 0.80
StoreMethod · 0.80
RemoveStatusListenerMethod · 0.80
LoadMethod · 0.80
checkMsgFunction · 0.70
configurePullMethod · 0.65
DoneMethod · 0.65
ErrMethod · 0.65