(subject string)
| 1038 | } |
| 1039 | |
| 1040 | func (s *pullSubscription) pullMessages(subject string) { |
| 1041 | for { |
| 1042 | select { |
| 1043 | case req := <-s.fetchNext: |
| 1044 | s.fetchInProgress.Store(1) |
| 1045 | |
| 1046 | if err := s.pull(req, subject); err != nil { |
| 1047 | if errors.Is(err, ErrMsgIteratorClosed) { |
| 1048 | s.cleanup() |
| 1049 | return |
| 1050 | } |
| 1051 | s.errs <- err |
| 1052 | } |
| 1053 | s.fetchInProgress.Store(0) |
| 1054 | case <-s.done: |
| 1055 | s.cleanup() |
| 1056 | return |
| 1057 | } |
| 1058 | } |
| 1059 | } |
| 1060 | |
| 1061 | func (s *pullSubscription) closeMsgs() { |
| 1062 | if s.msgsClosed.CompareAndSwap(0, 1) { |