MCPcopy
hub / github.com/nats-io/nats.go / checkPending

Method checkPending

jetstream/pull.go:454–497  ·  view source on GitHub ↗

checkPending verifies whether there are enough messages in the buffer to trigger a new pull request. lock should be held before calling this method

()

Source from the content-addressed store, hash-verified

452// the buffer to trigger a new pull request.
453// lock should be held before calling this method
454func (s *pullSubscription) checkPending() {
455 // check if we went below any threshold
456 // we don't want to track bytes threshold if either it's not set or we used
457 // PullMaxMessagesWithBytesLimit
458 if (s.pending.msgCount < s.consumeOpts.ThresholdMessages ||
459 (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0 && !s.consumeOpts.LimitSize)) &&
460 s.fetchInProgress.Load() == 0 {
461
462 var batchSize, maxBytes int
463 batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount
464 if s.consumeOpts.MaxBytes != 0 {
465 if s.consumeOpts.LimitSize {
466 maxBytes = s.consumeOpts.MaxBytes
467 } else {
468 maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount
469 // when working with max bytes only, always ask for full batch
470 batchSize = s.consumeOpts.MaxMessages
471 }
472 }
473 if s.consumeOpts.StopAfter > 0 {
474 batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount)
475 }
476 if batchSize > 0 {
477 pinID := ""
478 if s.consumer != nil {
479 pinID = s.consumer.getPinID()
480 }
481 s.fetchNext <- &pullRequest{
482 Expires: s.consumeOpts.Expires,
483 Batch: batchSize,
484 MaxBytes: maxBytes,
485 Heartbeat: s.consumeOpts.Heartbeat,
486 PinID: pinID,
487 Group: s.consumeOpts.Group,
488 MinPending: s.consumeOpts.MinPending,
489 MinAckPending: s.consumeOpts.MinAckPending,
490 Priority: s.consumeOpts.Priority,
491 }
492
493 s.pending.msgCount = s.consumeOpts.MaxMessages
494 s.pending.byteCount = s.consumeOpts.MaxBytes
495 }
496 }
497}
498
499// Messages returns MessagesContext, allowing continuously iterating
500// over messages on a stream. Messages cannot be used concurrently

Callers 3

ConsumeMethod · 0.95
NextMethod · 0.95

Calls 2

LoadMethod · 0.80
getPinIDMethod · 0.80

Tested by 1