Next retrieves next message on a stream. If MessagesContext is closed (either stopped or drained), Next will return ErrMsgIteratorClosed error. An optional timeout or context can be provided using NextOpt options. If none are provided, Next will block indefinitely until a message is available, itera
(opts ...NextOpt)
| 592 | // options. If none are provided, Next will block indefinitely until a |
| 593 | // message is available, iterator is closed or a heartbeat error occurs. |
| 594 | func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) { |
| 595 | var nextOpts nextOpts |
| 596 | for _, opt := range opts { |
| 597 | opt.configureNext(&nextOpts) |
| 598 | } |
| 599 | |
| 600 | if nextOpts.timeout > 0 && nextOpts.ctx != nil { |
| 601 | return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption) |
| 602 | } |
| 603 | |
| 604 | // Create timeout channel if needed |
| 605 | var timeoutCh <-chan time.Time |
| 606 | if nextOpts.timeout > 0 { |
| 607 | timer := time.NewTimer(nextOpts.timeout) |
| 608 | defer timer.Stop() |
| 609 | timeoutCh = timer.C |
| 610 | } |
| 611 | |
| 612 | // Use context if provided |
| 613 | var ctxDone <-chan struct{} |
| 614 | if nextOpts.ctx != nil { |
| 615 | ctxDone = nextOpts.ctx.Done() |
| 616 | } |
| 617 | |
| 618 | s.Lock() |
| 619 | defer s.Unlock() |
| 620 | drainMode := s.draining.Load() == 1 |
| 621 | closed := s.closed.Load() == 1 |
| 622 | if closed && !drainMode { |
| 623 | // Check if iterator was closed due to connection closure |
| 624 | if s.consumer.js.conn.IsClosed() { |
| 625 | return nil, fmt.Errorf("%w: %w", ErrMsgIteratorClosed, ErrConnectionClosed) |
| 626 | } |
| 627 | return nil, ErrMsgIteratorClosed |
| 628 | } |
| 629 | hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat) |
| 630 | defer func() { |
| 631 | if hbMonitor != nil { |
| 632 | hbMonitor.Stop() |
| 633 | } |
| 634 | }() |
| 635 | |
| 636 | isConnected := true |
| 637 | if s.consumeOpts.StopAfter > 0 && s.delivered >= s.consumeOpts.StopAfter { |
| 638 | s.Stop() |
| 639 | return nil, ErrMsgIteratorClosed |
| 640 | } |
| 641 | |
| 642 | for { |
| 643 | s.checkPending() |
| 644 | select { |
| 645 | case msg, ok := <-s.msgs: |
| 646 | if !ok { |
| 647 | // if msgs channel is closed, it means that subscription was either drained or stopped |
| 648 | s.consumer.subs.Delete(s.id) |
| 649 | s.draining.CompareAndSwap(1, 0) |
| 650 | // Check if iterator was closed due to connection closure |
| 651 | if s.consumer.js.conn.IsClosed() { |
nothing calls this directly
no test coverage detected