MCPcopy
hub / github.com/nats-io/nats.go / Next

Method Next

jetstream/ordered.go:291–362  ·  view source on GitHub ↗
(opts ...NextOpt)

Source from the content-addressed store, hash-verified

289}
290
291func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) {
292 for {
293 msg, err := s.consumer.currentSub.Next(opts...)
294 if err != nil {
295 // Check for errors which should be returned directly
296 // without resetting the consumer
297 if errors.Is(err, ErrInvalidOption) {
298 return nil, err
299 }
300 if errors.Is(err, nats.ErrTimeout) {
301 return nil, err
302 }
303 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
304 return nil, err
305 }
306 if errors.Is(err, ErrMsgIteratorClosed) {
307 s.Stop()
308 return nil, err
309 }
310 if s.consumer.withStopAfter {
311 select {
312 case s.consumer.stopAfter = <-s.consumer.stopAfterMsgsLeft:
313 default:
314 }
315 if s.consumer.stopAfter <= 0 {
316 s.Stop()
317 return nil, ErrMsgIteratorClosed
318 }
319 s.opts[len(s.opts)-1] = StopAfter(s.consumer.stopAfter)
320 }
321 if err := s.consumer.reset(); err != nil {
322 if errors.Is(err, errOrderedConsumerClosed) {
323 return nil, ErrMsgIteratorClosed
324 }
325 return nil, err
326 }
327 cc, err := s.consumer.currentConsumer.Messages(s.opts...)
328 if err != nil {
329 return nil, err
330 }
331 s.consumer.currentSub = cc.(*pullSubscription)
332 continue
333 }
334
335 meta, err := msg.Metadata()
336 if err != nil {
337 return nil, err
338 }
339 serial := serialNumberFromConsumer(meta.Consumer)
340 if serial != s.consumer.serial {
341 continue
342 }
343 dseq := meta.Sequence.Consumer
344 if dseq != s.consumer.cursor.deliverSeq+1 {
345 if err := s.consumer.reset(); err != nil {
346 if errors.Is(err, errOrderedConsumerClosed) {
347 return nil, ErrMsgIteratorClosed
348 }

Callers

nothing calls this directly

Calls 8

StopMethod · 0.95
StopAfterTypeAlias · 0.85
serialNumberFromConsumerFunction · 0.85
NextMethod · 0.65
MessagesMethod · 0.65
MetadataMethod · 0.65
IsMethod · 0.45
resetMethod · 0.45

Tested by

no test coverage detected