MCPcopy
hub / github.com/nats-io/nats.go / Messages

Method Messages

jetstream/pull.go:504–582  ·  view source on GitHub ↗

Messages returns MessagesContext, allowing continuously iterating over messages on a stream. Messages cannot be used concurrently when using ordered consumer. See [Consumer.Messages] for more details.

(opts ...PullMessagesOpt)

Source from the content-addressed store, hash-verified

502//
503// See [Consumer.Messages] for more details.
504func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) {
505 consumeOpts, err := parseMessagesOpts(false, opts...)
506 if err != nil {
507 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
508 }
509
510 if len(p.info.Config.PriorityGroups) != 0 {
511 if consumeOpts.Group == "" {
512 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer")
513 }
514
515 if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) {
516 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group")
517 }
518 } else if consumeOpts.Group != "" {
519 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer")
520 }
521
522 p.Lock()
523 subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
524
525 msgs := make(chan *nats.Msg, consumeOpts.MaxMessages)
526
527 consumeID := nuid.Next()
528 sub := &pullSubscription{
529 id: consumeID,
530 consumer: p,
531 done: make(chan struct{}, 1),
532 msgs: msgs,
533 errs: make(chan error, 10),
534 fetchNext: make(chan *pullRequest, 1),
535 consumeOpts: consumeOpts,
536 }
537 sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING)
538 inbox := p.js.conn.NewInbox()
539 sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs)
540 if err != nil {
541 p.Unlock()
542 return nil, err
543 }
544 sub.subscription.SetClosedHandler(func(sid string) func(string) {
545 return func(subject string) {
546 if sub.draining.Load() != 1 {
547 // if we're not draining, subscription can be closed as soon
548 // as closed handler is called
549 // otherwise, we need to wait until all messages are drained
550 // in Next
551 p.subs.Delete(sid)
552 }
553 sub.closeMsgs()
554 }
555 }(sub.id))
556
557 p.subs.Store(sub.id, sub)
558 p.Unlock()
559
560 go sub.pullMessages(subject)
561

Callers

nothing calls this directly

Calls 13

closeMsgsMethod · 0.95
pullMessagesMethod · 0.95
parseMessagesOptsFunction · 0.85
ErrorfMethod · 0.80
apiSubjectMethod · 0.80
NewInboxMethod · 0.80
LoadMethod · 0.80
StoreMethod · 0.80
NextMethod · 0.65
ChanSubscribeMethod · 0.65
DeleteMethod · 0.65
StatusChangedMethod · 0.45

Tested by

no test coverage detected