NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (E
(timeout time.Duration)
| 5355 | // the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), |
| 5356 | // or if there were no responders (ErrNoResponders) when used in the context of a request/reply. |
| 5357 | func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { |
| 5358 | if s == nil { |
| 5359 | return nil, ErrBadSubscription |
| 5360 | } |
| 5361 | |
| 5362 | s.mu.Lock() |
| 5363 | err := s.validateNextMsgState(false) |
| 5364 | if err != nil { |
| 5365 | s.mu.Unlock() |
| 5366 | return nil, err |
| 5367 | } |
| 5368 | |
| 5369 | // snapshot |
| 5370 | mch := s.mch |
| 5371 | s.mu.Unlock() |
| 5372 | |
| 5373 | var ok bool |
| 5374 | var msg *Msg |
| 5375 | |
| 5376 | // If something is available right away, let's optimize that case. |
| 5377 | select { |
| 5378 | case msg, ok = <-mch: |
| 5379 | if !ok { |
| 5380 | return nil, s.getNextMsgErr() |
| 5381 | } |
| 5382 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 5383 | return nil, err |
| 5384 | } else { |
| 5385 | return msg, nil |
| 5386 | } |
| 5387 | default: |
| 5388 | } |
| 5389 | |
| 5390 | // If we are here a message was not immediately available, so lets loop |
| 5391 | // with a timeout. |
| 5392 | |
| 5393 | t := globalTimerPool.Get(timeout) |
| 5394 | defer globalTimerPool.Put(t) |
| 5395 | |
| 5396 | if s.errCh != nil { |
| 5397 | select { |
| 5398 | case msg, ok = <-mch: |
| 5399 | if !ok { |
| 5400 | return nil, s.getNextMsgErr() |
| 5401 | } |
| 5402 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 5403 | return nil, err |
| 5404 | } |
| 5405 | case err := <-s.errCh: |
| 5406 | return nil, err |
| 5407 | case <-t.C: |
| 5408 | return nil, ErrTimeout |
| 5409 | } |
| 5410 | } else { |
| 5411 | select { |
| 5412 | case msg, ok = <-mch: |
| 5413 | if !ok { |
| 5414 | return nil, s.getNextMsgErr() |