(ctx context.Context, pullSubInternal, waitIfNoMsg bool)
| 104 | } |
| 105 | |
| 106 | func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) { |
| 107 | if ctx == nil { |
| 108 | return nil, ErrInvalidContext |
| 109 | } |
| 110 | if s == nil { |
| 111 | return nil, ErrBadSubscription |
| 112 | } |
| 113 | if ctx.Err() != nil { |
| 114 | return nil, ctx.Err() |
| 115 | } |
| 116 | |
| 117 | s.mu.Lock() |
| 118 | err := s.validateNextMsgState(pullSubInternal) |
| 119 | if err != nil { |
| 120 | s.mu.Unlock() |
| 121 | return nil, err |
| 122 | } |
| 123 | |
| 124 | // snapshot |
| 125 | mch := s.mch |
| 126 | s.mu.Unlock() |
| 127 | |
| 128 | var ok bool |
| 129 | var msg *Msg |
| 130 | |
| 131 | // If something is available right away, let's optimize that case. |
| 132 | select { |
| 133 | case msg, ok = <-mch: |
| 134 | if !ok { |
| 135 | return nil, s.getNextMsgErr() |
| 136 | } |
| 137 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 138 | return nil, err |
| 139 | } |
| 140 | return msg, nil |
| 141 | default: |
| 142 | // If internal and we don't want to wait, signal that there is no |
| 143 | // message in the internal queue. |
| 144 | if pullSubInternal && !waitIfNoMsg { |
| 145 | return nil, errNoMessages |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | select { |
| 150 | case msg, ok = <-mch: |
| 151 | if !ok { |
| 152 | return nil, s.getNextMsgErr() |
| 153 | } |
| 154 | if err := s.processNextMsgDelivered(msg); err != nil { |
| 155 | return nil, err |
| 156 | } |
| 157 | case <-ctx.Done(): |
| 158 | return nil, ctx.Err() |
| 159 | } |
| 160 | |
| 161 | return msg, nil |
| 162 | } |
| 163 |
no test coverage detected