| 289 | } |
| 290 | |
| 291 | func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) { |
| 292 | for { |
| 293 | msg, err := s.consumer.currentSub.Next(opts...) |
| 294 | if err != nil { |
| 295 | // Check for errors which should be returned directly |
| 296 | // without resetting the consumer |
| 297 | if errors.Is(err, ErrInvalidOption) { |
| 298 | return nil, err |
| 299 | } |
| 300 | if errors.Is(err, nats.ErrTimeout) { |
| 301 | return nil, err |
| 302 | } |
| 303 | if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
| 304 | return nil, err |
| 305 | } |
| 306 | if errors.Is(err, ErrMsgIteratorClosed) { |
| 307 | s.Stop() |
| 308 | return nil, err |
| 309 | } |
| 310 | if s.consumer.withStopAfter { |
| 311 | select { |
| 312 | case s.consumer.stopAfter = <-s.consumer.stopAfterMsgsLeft: |
| 313 | default: |
| 314 | } |
| 315 | if s.consumer.stopAfter <= 0 { |
| 316 | s.Stop() |
| 317 | return nil, ErrMsgIteratorClosed |
| 318 | } |
| 319 | s.opts[len(s.opts)-1] = StopAfter(s.consumer.stopAfter) |
| 320 | } |
| 321 | if err := s.consumer.reset(); err != nil { |
| 322 | if errors.Is(err, errOrderedConsumerClosed) { |
| 323 | return nil, ErrMsgIteratorClosed |
| 324 | } |
| 325 | return nil, err |
| 326 | } |
| 327 | cc, err := s.consumer.currentConsumer.Messages(s.opts...) |
| 328 | if err != nil { |
| 329 | return nil, err |
| 330 | } |
| 331 | s.consumer.currentSub = cc.(*pullSubscription) |
| 332 | continue |
| 333 | } |
| 334 | |
| 335 | meta, err := msg.Metadata() |
| 336 | if err != nil { |
| 337 | return nil, err |
| 338 | } |
| 339 | serial := serialNumberFromConsumer(meta.Consumer) |
| 340 | if serial != s.consumer.serial { |
| 341 | continue |
| 342 | } |
| 343 | dseq := meta.Sequence.Consumer |
| 344 | if dseq != s.consumer.cursor.deliverSeq+1 { |
| 345 | if err := s.consumer.reset(); err != nil { |
| 346 | if errors.Is(err, errOrderedConsumerClosed) { |
| 347 | return nil, ErrMsgIteratorClosed |
| 348 | } |