pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription]. Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned
(req *pullRequest, subject string)
| 1103 | // pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription]. |
| 1104 | // Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned |
| 1105 | func (s *pullSubscription) pull(req *pullRequest, subject string) error { |
| 1106 | s.consumer.Lock() |
| 1107 | defer s.consumer.Unlock() |
| 1108 | if s.closed.Load() == 1 { |
| 1109 | return ErrMsgIteratorClosed |
| 1110 | } |
| 1111 | if req.Batch < 1 { |
| 1112 | return fmt.Errorf("%w: batch size must be at least 1", nats.ErrInvalidArg) |
| 1113 | } |
| 1114 | reqJSON, err := json.Marshal(req) |
| 1115 | if err != nil { |
| 1116 | return err |
| 1117 | } |
| 1118 | reply := s.subscription.Subject |
| 1119 | |
| 1120 | if err := s.consumer.js.conn.PublishRequest(subject, reply, reqJSON); err != nil { |
| 1121 | return err |
| 1122 | } |
| 1123 | return nil |
| 1124 | } |
| 1125 | |
| 1126 | func parseConsumeOpts(ordered bool, opts ...PullConsumeOpt) (*consumeOpts, error) { |
| 1127 | consumeOpts := &consumeOpts{ |
no test coverage detected