FetchBytes is used to retrieve up to a provided bytes from the stream.
(maxBytes int, opts ...FetchOpt)
| 852 | |
| 853 | // FetchBytes is used to retrieve up to a provided bytes from the stream. |
| 854 | func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) { |
| 855 | req := &pullRequest{ |
| 856 | Batch: defaultBatchMaxBytesOnly, |
| 857 | MaxBytes: maxBytes, |
| 858 | Expires: DefaultExpires, |
| 859 | Heartbeat: unset, |
| 860 | } |
| 861 | for _, opt := range opts { |
| 862 | if err := opt(req); err != nil { |
| 863 | return nil, err |
| 864 | } |
| 865 | } |
| 866 | |
| 867 | if req.ctx != nil && req.maxWaitSet { |
| 868 | return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption) |
| 869 | } |
| 870 | |
| 871 | // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls |
| 872 | // and disable it for shorter pulls |
| 873 | if req.Heartbeat == unset { |
| 874 | if req.Expires >= 10*time.Second { |
| 875 | req.Heartbeat = 5 * time.Second |
| 876 | } else { |
| 877 | req.Heartbeat = 0 |
| 878 | } |
| 879 | } |
| 880 | if req.Expires < 2*req.Heartbeat { |
| 881 | return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption) |
| 882 | } |
| 883 | |
| 884 | return p.fetch(req) |
| 885 | } |
| 886 | |
| 887 | // FetchNoWait sends a single request to retrieve given number of messages. |
| 888 | // FetchNoWait will only return messages that are available at the time of the |