MCPcopy
hub / github.com/nats-io/nats.go / NextMsg

Method NextMsg

nats.go:5357–5425  ·  view source on GitHub ↗

NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (E

(timeout time.Duration)

Source from the content-addressed store, hash-verified

5355// the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout),
5356// or if there were no responders (ErrNoResponders) when used in the context of a request/reply.
5357func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
5358 if s == nil {
5359 return nil, ErrBadSubscription
5360 }
5361
5362 s.mu.Lock()
5363 err := s.validateNextMsgState(false)
5364 if err != nil {
5365 s.mu.Unlock()
5366 return nil, err
5367 }
5368
5369 // snapshot
5370 mch := s.mch
5371 s.mu.Unlock()
5372
5373 var ok bool
5374 var msg *Msg
5375
5376 // If something is available right away, let's optimize that case.
5377 select {
5378 case msg, ok = <-mch:
5379 if !ok {
5380 return nil, s.getNextMsgErr()
5381 }
5382 if err := s.processNextMsgDelivered(msg); err != nil {
5383 return nil, err
5384 } else {
5385 return msg, nil
5386 }
5387 default:
5388 }
5389
5390 // If we are here a message was not immediately available, so lets loop
5391 // with a timeout.
5392
5393 t := globalTimerPool.Get(timeout)
5394 defer globalTimerPool.Put(t)
5395
5396 if s.errCh != nil {
5397 select {
5398 case msg, ok = <-mch:
5399 if !ok {
5400 return nil, s.getNextMsgErr()
5401 }
5402 if err := s.processNextMsgDelivered(msg); err != nil {
5403 return nil, err
5404 }
5405 case err := <-s.errCh:
5406 return nil, err
5407 case <-t.C:
5408 return nil, ErrTimeout
5409 }
5410 } else {
5411 select {
5412 case msg, ok = <-mch:
5413 if !ok {
5414 return nil, s.getNextMsgErr()

Callers 15

MsgsTimeoutMethod · 0.95
TestNilConnectionFunction · 0.95
oldRequestMethod · 0.80
ExampleJetStreamFunction · 0.80
ExampleJSOptFunction · 0.80
ExampleSubOptFunction · 0.80
ExampleAckWaitFunction · 0.80
ExampleMsg_AckSyncFunction · 0.80
ExampleMsg_MetadataFunction · 0.80

Calls 5

validateNextMsgStateMethod · 0.95
getNextMsgErrMethod · 0.95
GetMethod · 0.65
PutMethod · 0.65

Tested by 15

TestNilConnectionFunction · 0.76
ExampleJetStreamFunction · 0.64
ExampleJSOptFunction · 0.64
ExampleSubOptFunction · 0.64
ExampleAckWaitFunction · 0.64
ExampleMsg_AckSyncFunction · 0.64
ExampleMsg_MetadataFunction · 0.64
ExampleAckOptFunction · 0.64
TestClosedConnectionsFunction · 0.64