nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not time out. It is only used internally for non-timeout subscription iterator.
()
| 5427 | // nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not |
| 5428 | // time out. It is only used internally for non-timeout subscription iterator. |
| 5429 | func (s *Subscription) nextMsgNoTimeout() (*Msg, error) { |
| 5430 | if s == nil { |
| 5431 | return nil, ErrBadSubscription |
| 5432 | } |
| 5433 | |
| 5434 | s.mu.Lock() |
| 5435 | err := s.validateNextMsgState(false) |
| 5436 | if err != nil { |
| 5437 | s.mu.Unlock() |
| 5438 | return nil, err |
| 5439 | } |
| 5440 | |
| 5441 | // snapshot |
| 5442 | mch := s.mch |
| 5443 | s.mu.Unlock() |
| 5444 | |
| 5445 | var ok bool |
| 5446 | var msg *Msg |
| 5447 | |
| 5448 | // If something is available right away, let's optimize that case. |
| 5449 | select { |
| 5450 | case msg, ok = <-mch: |
| 5451 | if !ok { |
| 5452 | return nil, s.getNextMsgErr() |
| 5453 | } |
| 5454 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 5455 | return nil, err |
| 5456 | } else { |
| 5457 | return msg, nil |
| 5458 | } |
| 5459 | default: |
| 5460 | } |
| 5461 | |
| 5462 | if s.errCh != nil { |
| 5463 | select { |
| 5464 | case msg, ok = <-mch: |
| 5465 | if !ok { |
| 5466 | return nil, s.getNextMsgErr() |
| 5467 | } |
| 5468 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 5469 | return nil, err |
| 5470 | } |
| 5471 | case err := <-s.errCh: |
| 5472 | return nil, err |
| 5473 | } |
| 5474 | } else { |
| 5475 | msg, ok = <-mch |
| 5476 | if !ok { |
| 5477 | return nil, s.getNextMsgErr() |
| 5478 | } |
| 5479 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 5480 | return nil, err |
| 5481 | } |
| 5482 | } |
| 5483 | |
| 5484 | return msg, nil |
| 5485 | } |
| 5486 |
no test coverage detected