MCPcopy
hub / github.com/nats-io/nats.go / pullMessages

Method pullMessages

jetstream/pull.go:1040–1059  ·  view source on GitHub ↗
(subject string)

Source from the content-addressed store, hash-verified

1038}
1039
1040func (s *pullSubscription) pullMessages(subject string) {
1041 for {
1042 select {
1043 case req := <-s.fetchNext:
1044 s.fetchInProgress.Store(1)
1045
1046 if err := s.pull(req, subject); err != nil {
1047 if errors.Is(err, ErrMsgIteratorClosed) {
1048 s.cleanup()
1049 return
1050 }
1051 s.errs <- err
1052 }
1053 s.fetchInProgress.Store(0)
1054 case <-s.done:
1055 s.cleanup()
1056 return
1057 }
1058 }
1059}
1060
1061func (s *pullSubscription) closeMsgs() {
1062 if s.msgsClosed.CompareAndSwap(0, 1) {

Callers 2

ConsumeMethod · 0.95
MessagesMethod · 0.95

Calls 4

pullMethod · 0.95
cleanupMethod · 0.95
StoreMethod · 0.80
IsMethod · 0.45

Tested by

no test coverage detected