Fetch sends a single request to retrieve given number of messages. It will wait up to provided expiry time if not all messages are available.
(batch int, opts ...FetchOpt)
| 819 | // Fetch sends a single request to retrieve given number of messages. |
| 820 | // It will wait up to provided expiry time if not all messages are available. |
| 821 | func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) { |
| 822 | req := &pullRequest{ |
| 823 | Batch: batch, |
| 824 | Expires: DefaultExpires, |
| 825 | Heartbeat: unset, |
| 826 | } |
| 827 | for _, opt := range opts { |
| 828 | if err := opt(req); err != nil { |
| 829 | return nil, err |
| 830 | } |
| 831 | } |
| 832 | |
| 833 | if req.ctx != nil && req.maxWaitSet { |
| 834 | return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption) |
| 835 | } |
| 836 | |
| 837 | // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls |
| 838 | // and disable it for shorter pulls |
| 839 | if req.Heartbeat == unset { |
| 840 | if req.Expires >= 10*time.Second { |
| 841 | req.Heartbeat = 5 * time.Second |
| 842 | } else { |
| 843 | req.Heartbeat = 0 |
| 844 | } |
| 845 | } |
| 846 | if req.Expires < 2*req.Heartbeat { |
| 847 | return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption) |
| 848 | } |
| 849 | |
| 850 | return p.fetch(req) |
| 851 | } |
| 852 | |
| 853 | // FetchBytes is used to retrieve up to a provided bytes from the stream. |
| 854 | func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) { |