MCPcopy
hub / github.com/nats-io/nats.go / nextMsgWithContext

Method nextMsgWithContext

context.go:106–162  ·  view source on GitHub ↗
(ctx context.Context, pullSubInternal, waitIfNoMsg bool)

Source from the content-addressed store, hash-verified

104}
105
106func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) {
107 if ctx == nil {
108 return nil, ErrInvalidContext
109 }
110 if s == nil {
111 return nil, ErrBadSubscription
112 }
113 if ctx.Err() != nil {
114 return nil, ctx.Err()
115 }
116
117 s.mu.Lock()
118 err := s.validateNextMsgState(pullSubInternal)
119 if err != nil {
120 s.mu.Unlock()
121 return nil, err
122 }
123
124 // snapshot
125 mch := s.mch
126 s.mu.Unlock()
127
128 var ok bool
129 var msg *Msg
130
131 // If something is available right away, let's optimize that case.
132 select {
133 case msg, ok = <-mch:
134 if !ok {
135 return nil, s.getNextMsgErr()
136 }
137 if err := s.processNextMsgDelivered(msg); err != nil {
138 return nil, err
139 }
140 return msg, nil
141 default:
142 // If internal and we don't want to wait, signal that there is no
143 // message in the internal queue.
144 if pullSubInternal && !waitIfNoMsg {
145 return nil, errNoMessages
146 }
147 }
148
149 select {
150 case msg, ok = <-mch:
151 if !ok {
152 return nil, s.getNextMsgErr()
153 }
154 if err := s.processNextMsgDelivered(msg); err != nil {
155 return nil, err
156 }
157 case <-ctx.Done():
158 return nil, ctx.Err()
159 }
160
161 return msg, nil
162}
163

Callers 3

FetchMethod · 0.95
FetchBatchMethod · 0.95
NextMsgWithContextMethod · 0.95

Calls 5

validateNextMsgStateMethod · 0.95
getNextMsgErrMethod · 0.95
ErrMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected