Fetch is used to retrieve up to a provided number of messages from a stream. This method will always send a single request and wait until either all messages are retrieved or request times out. It is not efficient to use Fetch with on an ordered consumer, as it will reset the consumer for each subs
(batch int, opts ...FetchOpt)
| 418 | // reset the consumer for each subsequent Fetch call. |
| 419 | // Consider using [Consumer.Consume] or [Consumer.Messages] instead. |
| 420 | func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) { |
| 421 | c.Lock() |
| 422 | if c.consumerType == consumerTypeConsume { |
| 423 | c.Unlock() |
| 424 | return nil, ErrOrderConsumerUsedAsConsume |
| 425 | } |
| 426 | if c.runningFetch != nil { |
| 427 | if !c.runningFetch.closed() { |
| 428 | return nil, ErrOrderedConsumerConcurrentRequests |
| 429 | } |
| 430 | if c.runningFetch.sseq != 0 { |
| 431 | c.cursor.streamSeq = c.runningFetch.sseq |
| 432 | } |
| 433 | } |
| 434 | c.consumerType = consumerTypeFetch |
| 435 | sub := orderedSubscription{ |
| 436 | consumer: c, |
| 437 | done: make(chan struct{}), |
| 438 | } |
| 439 | c.subscription = &sub |
| 440 | c.Unlock() |
| 441 | err := c.reset() |
| 442 | if err != nil { |
| 443 | return nil, err |
| 444 | } |
| 445 | msgs, err := c.currentConsumer.Fetch(batch, opts...) |
| 446 | if err != nil { |
| 447 | return nil, err |
| 448 | } |
| 449 | c.runningFetch = msgs.(*fetchResult) |
| 450 | return msgs, nil |
| 451 | } |
| 452 | |
| 453 | // FetchBytes is used to retrieve up to a provided bytes from the |
| 454 | // stream. This method will always send a single request and wait until |