checkPending verifies whether there are enough messages in the buffer to trigger a new pull request. lock should be held before calling this method
()
| 452 | // the buffer to trigger a new pull request. |
| 453 | // lock should be held before calling this method |
| 454 | func (s *pullSubscription) checkPending() { |
| 455 | // check if we went below any threshold |
| 456 | // we don't want to track bytes threshold if either it's not set or we used |
| 457 | // PullMaxMessagesWithBytesLimit |
| 458 | if (s.pending.msgCount < s.consumeOpts.ThresholdMessages || |
| 459 | (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0 && !s.consumeOpts.LimitSize)) && |
| 460 | s.fetchInProgress.Load() == 0 { |
| 461 | |
| 462 | var batchSize, maxBytes int |
| 463 | batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount |
| 464 | if s.consumeOpts.MaxBytes != 0 { |
| 465 | if s.consumeOpts.LimitSize { |
| 466 | maxBytes = s.consumeOpts.MaxBytes |
| 467 | } else { |
| 468 | maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount |
| 469 | // when working with max bytes only, always ask for full batch |
| 470 | batchSize = s.consumeOpts.MaxMessages |
| 471 | } |
| 472 | } |
| 473 | if s.consumeOpts.StopAfter > 0 { |
| 474 | batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount) |
| 475 | } |
| 476 | if batchSize > 0 { |
| 477 | pinID := "" |
| 478 | if s.consumer != nil { |
| 479 | pinID = s.consumer.getPinID() |
| 480 | } |
| 481 | s.fetchNext <- &pullRequest{ |
| 482 | Expires: s.consumeOpts.Expires, |
| 483 | Batch: batchSize, |
| 484 | MaxBytes: maxBytes, |
| 485 | Heartbeat: s.consumeOpts.Heartbeat, |
| 486 | PinID: pinID, |
| 487 | Group: s.consumeOpts.Group, |
| 488 | MinPending: s.consumeOpts.MinPending, |
| 489 | MinAckPending: s.consumeOpts.MinAckPending, |
| 490 | Priority: s.consumeOpts.Priority, |
| 491 | } |
| 492 | |
| 493 | s.pending.msgCount = s.consumeOpts.MaxMessages |
| 494 | s.pending.byteCount = s.consumeOpts.MaxBytes |
| 495 | } |
| 496 | } |
| 497 | } |
| 498 | |
| 499 | // Messages returns MessagesContext, allowing continuously iterating |
| 500 | // over messages on a stream. Messages cannot be used concurrently |