MCPcopy
hub / github.com/nats-io/nats.go / Fetch

Method Fetch

jetstream/pull.go:821–851  ·  view source on GitHub ↗

Fetch sends a single request to retrieve given number of messages. It will wait up to provided expiry time if not all messages are available.

(batch int, opts ...FetchOpt)

Source from the content-addressed store, hash-verified

819// Fetch sends a single request to retrieve given number of messages.
820// It will wait up to provided expiry time if not all messages are available.
821func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
822 req := &pullRequest{
823 Batch: batch,
824 Expires: DefaultExpires,
825 Heartbeat: unset,
826 }
827 for _, opt := range opts {
828 if err := opt(req); err != nil {
829 return nil, err
830 }
831 }
832
833 if req.ctx != nil && req.maxWaitSet {
834 return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
835 }
836
837 // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
838 // and disable it for shorter pulls
839 if req.Heartbeat == unset {
840 if req.Expires >= 10*time.Second {
841 req.Heartbeat = 5 * time.Second
842 } else {
843 req.Heartbeat = 0
844 }
845 }
846 if req.Expires < 2*req.Heartbeat {
847 return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
848 }
849
850 return p.fetch(req)
851}
852
853// FetchBytes is used to retrieve up to a provided bytes from the stream.
854func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {

Callers 1

NextMethod · 0.95

Calls 2

fetchMethod · 0.95
ErrorfMethod · 0.80

Tested by

no test coverage detected