MCPcopy
hub / github.com/nats-io/nats.go / FetchBytes

Method FetchBytes

jetstream/pull.go:854–885  ·  view source on GitHub ↗

FetchBytes is used to retrieve up to a provided bytes from the stream.

(maxBytes int, opts ...FetchOpt)

Source from the content-addressed store, hash-verified

852
853// FetchBytes is used to retrieve up to a provided bytes from the stream.
854func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
855 req := &pullRequest{
856 Batch: defaultBatchMaxBytesOnly,
857 MaxBytes: maxBytes,
858 Expires: DefaultExpires,
859 Heartbeat: unset,
860 }
861 for _, opt := range opts {
862 if err := opt(req); err != nil {
863 return nil, err
864 }
865 }
866
867 if req.ctx != nil && req.maxWaitSet {
868 return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
869 }
870
871 // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
872 // and disable it for shorter pulls
873 if req.Heartbeat == unset {
874 if req.Expires >= 10*time.Second {
875 req.Heartbeat = 5 * time.Second
876 } else {
877 req.Heartbeat = 0
878 }
879 }
880 if req.Expires < 2*req.Heartbeat {
881 return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
882 }
883
884 return p.fetch(req)
885}
886
887// FetchNoWait sends a single request to retrieve given number of messages.
888// FetchNoWait will only return messages that are available at the time of the

Callers

nothing calls this directly

Calls 2

fetchMethod · 0.95
ErrorfMethod · 0.80

Tested by

no test coverage detected