MCPcopy
hub / github.com/nats-io/nats.go / fetch

Method fetch

jetstream/pull.go:899–1002  ·  view source on GitHub ↗
(req *pullRequest)

Source from the content-addressed store, hash-verified

897}
898
899func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
900 res := &fetchResult{
901 msgs: make(chan Msg, req.Batch),
902 }
903 msgs := make(chan *nats.Msg, 2*req.Batch)
904 subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
905
906 sub := &pullSubscription{
907 consumer: p,
908 done: make(chan struct{}, 1),
909 msgs: msgs,
910 errs: make(chan error, 10),
911 }
912 inbox := p.js.conn.NewInbox()
913 var err error
914 sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs)
915 if err != nil {
916 return nil, err
917 }
918 req.PinID = p.getPinID()
919 if err := sub.pull(req, subject); err != nil {
920 return nil, err
921 }
922
923 var receivedMsgs, receivedBytes int
924 hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat)
925
926 // Use context if provided
927 var ctxDone <-chan struct{}
928 if req.ctx != nil {
929 ctxDone = req.ctx.Done()
930 }
931
932 go func(res *fetchResult) {
933 defer sub.subscription.Unsubscribe()
934 defer close(res.msgs)
935 for {
936 select {
937 case msg := <-msgs:
938 res.Lock()
939 if hbTimer != nil {
940 hbTimer.Reset(2 * req.Heartbeat)
941 }
942 userMsg, err := checkMsg(msg)
943 if err != nil {
944 errNotTimeoutOrNoMsgs := !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages)
945 if errNotTimeoutOrNoMsgs && !errors.Is(err, ErrMaxBytesExceeded) {
946 res.err = err
947 }
948 if errors.Is(err, ErrPinIDMismatch) {
949 p.setPinID("")
950 }
951 res.done = true
952 res.Unlock()
953 return
954 }
955 if !userMsg {
956 res.Unlock()

Callers 3

FetchMethod · 0.95
FetchBytesMethod · 0.95
FetchNoWaitMethod · 0.95

Calls 15

getPinIDMethod · 0.95
pullMethod · 0.95
setPinIDMethod · 0.95
apiSubjectMethod · 0.80
NewInboxMethod · 0.80
UnsubscribeMethod · 0.80
toJSMsgMethod · 0.80
ErrorfMethod · 0.80
checkMsgFunction · 0.70
ChanSubscribeMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected