MCPcopy
hub / github.com/nats-io/nats.go / nextMsgNoTimeout

Method nextMsgNoTimeout

nats.go:5429–5485  ·  view source on GitHub ↗

nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not time out. It is only used internally for non-timeout subscription iterator.

()

Source from the content-addressed store, hash-verified

5427// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not
5428// time out. It is only used internally for non-timeout subscription iterator.
5429func (s *Subscription) nextMsgNoTimeout() (*Msg, error) {
5430 if s == nil {
5431 return nil, ErrBadSubscription
5432 }
5433
5434 s.mu.Lock()
5435 err := s.validateNextMsgState(false)
5436 if err != nil {
5437 s.mu.Unlock()
5438 return nil, err
5439 }
5440
5441 // snapshot
5442 mch := s.mch
5443 s.mu.Unlock()
5444
5445 var ok bool
5446 var msg *Msg
5447
5448 // If something is available right away, let's optimize that case.
5449 select {
5450 case msg, ok = <-mch:
5451 if !ok {
5452 return nil, s.getNextMsgErr()
5453 }
5454 if err := s.processNextMsgDelivered(msg); err != nil {
5455 return nil, err
5456 } else {
5457 return msg, nil
5458 }
5459 default:
5460 }
5461
5462 if s.errCh != nil {
5463 select {
5464 case msg, ok = <-mch:
5465 if !ok {
5466 return nil, s.getNextMsgErr()
5467 }
5468 if err := s.processNextMsgDelivered(msg); err != nil {
5469 return nil, err
5470 }
5471 case err := <-s.errCh:
5472 return nil, err
5473 }
5474 } else {
5475 msg, ok = <-mch
5476 if !ok {
5477 return nil, s.getNextMsgErr()
5478 }
5479 if err := s.processNextMsgDelivered(msg); err != nil {
5480 return nil, err
5481 }
5482 }
5483
5484 return msg, nil
5485}
5486

Callers 1

MsgsMethod · 0.95

Calls 3

validateNextMsgStateMethod · 0.95
getNextMsgErrMethod · 0.95

Tested by

no test coverage detected