MCPcopy
hub / github.com/nats-io/nats.go / pull

Method pull

jetstream/pull.go:1105–1124  ·  view source on GitHub ↗

pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription]. Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned

(req *pullRequest, subject string)

Source from the content-addressed store, hash-verified

1103// pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription].
1104// Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned
1105func (s *pullSubscription) pull(req *pullRequest, subject string) error {
1106 s.consumer.Lock()
1107 defer s.consumer.Unlock()
1108 if s.closed.Load() == 1 {
1109 return ErrMsgIteratorClosed
1110 }
1111 if req.Batch < 1 {
1112 return fmt.Errorf("%w: batch size must be at least 1", nats.ErrInvalidArg)
1113 }
1114 reqJSON, err := json.Marshal(req)
1115 if err != nil {
1116 return err
1117 }
1118 reply := s.subscription.Subject
1119
1120 if err := s.consumer.js.conn.PublishRequest(subject, reply, reqJSON); err != nil {
1121 return err
1122 }
1123 return nil
1124}
1125
1126func parseConsumeOpts(ordered bool, opts ...PullConsumeOpt) (*consumeOpts, error) {
1127 consumeOpts := &consumeOpts{

Callers 3

ConsumeMethod · 0.95
fetchMethod · 0.95
pullMessagesMethod · 0.95

Calls 3

LoadMethod · 0.80
ErrorfMethod · 0.80
PublishRequestMethod · 0.45

Tested by

no test coverage detected