MCPcopy
hub / github.com/nats-io/nats.go / Fetch

Method Fetch

jetstream/ordered.go:420–451  ·  view source on GitHub ↗

Fetch is used to retrieve up to a provided number of messages from a stream. This method will always send a single request and wait until either all messages are retrieved or request times out. It is not efficient to use Fetch with on an ordered consumer, as it will reset the consumer for each subs

(batch int, opts ...FetchOpt)

Source from the content-addressed store, hash-verified

418// reset the consumer for each subsequent Fetch call.
419// Consider using [Consumer.Consume] or [Consumer.Messages] instead.
420func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
421 c.Lock()
422 if c.consumerType == consumerTypeConsume {
423 c.Unlock()
424 return nil, ErrOrderConsumerUsedAsConsume
425 }
426 if c.runningFetch != nil {
427 if !c.runningFetch.closed() {
428 return nil, ErrOrderedConsumerConcurrentRequests
429 }
430 if c.runningFetch.sseq != 0 {
431 c.cursor.streamSeq = c.runningFetch.sseq
432 }
433 }
434 c.consumerType = consumerTypeFetch
435 sub := orderedSubscription{
436 consumer: c,
437 done: make(chan struct{}),
438 }
439 c.subscription = &sub
440 c.Unlock()
441 err := c.reset()
442 if err != nil {
443 return nil, err
444 }
445 msgs, err := c.currentConsumer.Fetch(batch, opts...)
446 if err != nil {
447 return nil, err
448 }
449 c.runningFetch = msgs.(*fetchResult)
450 return msgs, nil
451}
452
453// FetchBytes is used to retrieve up to a provided bytes from the
454// stream. This method will always send a single request and wait until

Callers 1

NextMethod · 0.95

Calls 3

resetMethod · 0.95
closedMethod · 0.80
FetchMethod · 0.65

Tested by

no test coverage detected