Messages returns MessagesContext, allowing continuously iterating over messages on a stream. Messages cannot be used concurrently when using ordered consumer. See [Consumer.Messages] for more details.
(opts ...PullMessagesOpt)
| 502 | // |
| 503 | // See [Consumer.Messages] for more details. |
| 504 | func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) { |
| 505 | consumeOpts, err := parseMessagesOpts(false, opts...) |
| 506 | if err != nil { |
| 507 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) |
| 508 | } |
| 509 | |
| 510 | if len(p.info.Config.PriorityGroups) != 0 { |
| 511 | if consumeOpts.Group == "" { |
| 512 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer") |
| 513 | } |
| 514 | |
| 515 | if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) { |
| 516 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group") |
| 517 | } |
| 518 | } else if consumeOpts.Group != "" { |
| 519 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer") |
| 520 | } |
| 521 | |
| 522 | p.Lock() |
| 523 | subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name)) |
| 524 | |
| 525 | msgs := make(chan *nats.Msg, consumeOpts.MaxMessages) |
| 526 | |
| 527 | consumeID := nuid.Next() |
| 528 | sub := &pullSubscription{ |
| 529 | id: consumeID, |
| 530 | consumer: p, |
| 531 | done: make(chan struct{}, 1), |
| 532 | msgs: msgs, |
| 533 | errs: make(chan error, 10), |
| 534 | fetchNext: make(chan *pullRequest, 1), |
| 535 | consumeOpts: consumeOpts, |
| 536 | } |
| 537 | sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING) |
| 538 | inbox := p.js.conn.NewInbox() |
| 539 | sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs) |
| 540 | if err != nil { |
| 541 | p.Unlock() |
| 542 | return nil, err |
| 543 | } |
| 544 | sub.subscription.SetClosedHandler(func(sid string) func(string) { |
| 545 | return func(subject string) { |
| 546 | if sub.draining.Load() != 1 { |
| 547 | // if we're not draining, subscription can be closed as soon |
| 548 | // as closed handler is called |
| 549 | // otherwise, we need to wait until all messages are drained |
| 550 | // in Next |
| 551 | p.subs.Delete(sid) |
| 552 | } |
| 553 | sub.closeMsgs() |
| 554 | } |
| 555 | }(sub.id)) |
| 556 | |
| 557 | p.subs.Store(sub.id, sub) |
| 558 | p.Unlock() |
| 559 | |
| 560 | go sub.pullMessages(subject) |
| 561 |
nothing calls this directly
no test coverage detected