MCPcopy
hub / github.com/nats-io/nats.go / Consume

Method Consume

jetstream/pull.go:202–426  ·  jetstream/pull.go::pullConsumer.Consume

Consume can be used to continuously receive messages and handle them with the provided callback function. Consume cannot be used concurrently when using ordered consumer. See [Consumer.Consume] for more details.

(handler MessageHandler, opts ...PullConsumeOpt)

Source from the content-addressed store, hash-verified

200//
201// See [Consumer.Consume] for more details.
202func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
203 if handler == nil {
204 return nil, ErrHandlerRequired
205 }
206 consumeOpts, err := parseConsumeOpts(false, opts...)
207 if err != nil {
208 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
209 }
210
211 if len(p.info.Config.PriorityGroups) != 0 {
212 if consumeOpts.Group == "" {
213 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer")
214 }
215
216 if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) {
217 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group")
218 }
219 } else if consumeOpts.Group != "" {
220 return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer")
221 }
222
223 p.Lock()
224
225 subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
226
227 consumeID := nuid.Next()
228 sub := &pullSubscription{
229 id: consumeID,
230 consumer: p,
231 errs: make(chan error, 10),
232 done: make(chan struct{}, 1),
233 fetchNext: make(chan *pullRequest, 1),
234 consumeOpts: consumeOpts,
235 }
236 sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING, nats.CLOSED)
237
238 sub.hbMonitor = sub.scheduleHeartbeatCheck(consumeOpts.Heartbeat)
239
240 p.subs.Store(sub.id, sub)
241 p.Unlock()
242
243 internalHandler := func(msg *nats.Msg) {
244 if sub.hbMonitor != nil {
245 sub.hbMonitor.Stop()
246 }
247 userMsg, msgErr := checkMsg(msg)
248 if !userMsg && msgErr == nil {
249 if sub.hbMonitor != nil {
250 sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
251 }
252 return
253 }
254 if !userMsg {
255 // heartbeat message
256 if msgErr == nil {
257 return
258 }
259

Callers

nothing calls this directly

Calls 15

handleStatusMsgMethod · 0.95
checkPendingMethod · 0.95
StopMethod · 0.95
setPinIDMethod · 0.95
decrementPendingMsgsMethod · 0.95
resetPendingMsgsMethod · 0.95
pullMethod · 0.95
getPinIDMethod · 0.95
pullMessagesMethod · 0.95
parseConsumeOptsFunction · 0.85

Tested by

no test coverage detected