MCPcopy
hub / github.com/nats-io/nats.go / Next

Method Next

jetstream/pull.go:594–714  ·  jetstream/pull.go::pullSubscription.Next

Next retrieves next message on a stream. If MessagesContext is closed (either stopped or drained), Next will return ErrMsgIteratorClosed error. An optional timeout or context can be provided using NextOpt options. If none are provided, Next will block indefinitely until a message is available, itera

(opts ...NextOpt)

Source from the content-addressed store, hash-verified

592// options. If none are provided, Next will block indefinitely until a
593// message is available, iterator is closed or a heartbeat error occurs.
594func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) {
595 var nextOpts nextOpts
596 for _, opt := range opts {
597 opt.configureNext(&nextOpts)
598 }
599
600 if nextOpts.timeout > 0 && nextOpts.ctx != nil {
601 return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption)
602 }
603
604 // Create timeout channel if needed
605 var timeoutCh <-chan time.Time
606 if nextOpts.timeout > 0 {
607 timer := time.NewTimer(nextOpts.timeout)
608 defer timer.Stop()
609 timeoutCh = timer.C
610 }
611
612 // Use context if provided
613 var ctxDone <-chan struct{}
614 if nextOpts.ctx != nil {
615 ctxDone = nextOpts.ctx.Done()
616 }
617
618 s.Lock()
619 defer s.Unlock()
620 drainMode := s.draining.Load() == 1
621 closed := s.closed.Load() == 1
622 if closed && !drainMode {
623 // Check if iterator was closed due to connection closure
624 if s.consumer.js.conn.IsClosed() {
625 return nil, fmt.Errorf("%w: %w", ErrMsgIteratorClosed, ErrConnectionClosed)
626 }
627 return nil, ErrMsgIteratorClosed
628 }
629 hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat)
630 defer func() {
631 if hbMonitor != nil {
632 hbMonitor.Stop()
633 }
634 }()
635
636 isConnected := true
637 if s.consumeOpts.StopAfter > 0 && s.delivered >= s.consumeOpts.StopAfter {
638 s.Stop()
639 return nil, ErrMsgIteratorClosed
640 }
641
642 for {
643 s.checkPending()
644 select {
645 case msg, ok := <-s.msgs:
646 if !ok {
647 // if msgs channel is closed, it means that subscription was either drained or stopped
648 s.consumer.subs.Delete(s.id)
649 s.draining.CompareAndSwap(1, 0)
650 // Check if iterator was closed due to connection closure
651 if s.consumer.js.conn.IsClosed() {

Callers

nothing calls this directly

Calls 15

StopMethod · 0.95
checkPendingMethod · 0.95
handleStatusMsgMethod · 0.95
decrementPendingMsgsMethod · 0.95
ErrorfMethod · 0.80
LoadMethod · 0.80
IsClosedMethod · 0.80
CompareAndSwapMethod · 0.80
setPinIDMethod · 0.80
toJSMsgMethod · 0.80

Tested by

no test coverage detected