Drain unsubscribes from the stream and cancels subscription. All messages that are already in the buffer will be available on subsequent calls to Next. After the buffer is drained, Next will return ErrMsgIteratorClosed error.
()
| 784 | // subsequent calls to Next. After the buffer is drained, Next will |
| 785 | // return ErrMsgIteratorClosed error. |
| 786 | func (s *pullSubscription) Drain() { |
| 787 | if !s.closed.CompareAndSwap(0, 1) { |
| 788 | return |
| 789 | } |
| 790 | s.draining.Store(1) |
| 791 | close(s.done) |
| 792 | if s.consumeOpts.stopAfterMsgsLeft != nil { |
| 793 | if s.delivered >= s.consumeOpts.StopAfter { |
| 794 | close(s.consumeOpts.stopAfterMsgsLeft) |
| 795 | } else { |
| 796 | s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered |
| 797 | } |
| 798 | } |
| 799 | } |
| 800 | |
| 801 | // Closed returns a channel that is closed when consuming is |
| 802 | // fully stopped/drained. When the channel is closed, no more messages |
nothing calls this directly
no test coverage detected