* @internal * * Returns an async generator that yields full wire protocol messages from the underlying socket. This function * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request * by calling `return` on the generator. * * No
(
options: {
timeoutContext?: TimeoutContext;
} & Abortable
)
| 755 | * Note that `for-await` loops call `return` automatically when the loop is exited. |
| 756 | */ |
| 757 | private async *readMany( |
| 758 | options: { |
| 759 | timeoutContext?: TimeoutContext; |
| 760 | } & Abortable |
| 761 | ): AsyncGenerator<OpMsgResponse | OpReply> { |
| 762 | try { |
| 763 | this.dataEvents = onData(this.messageStream, options); |
| 764 | this.messageStream.resume(); |
| 765 | |
| 766 | for await (const message of this.dataEvents) { |
| 767 | const response = await decompressResponse(message); |
| 768 | yield response; |
| 769 | |
| 770 | if (!response.moreToCome) { |
| 771 | return; |
| 772 | } |
| 773 | } |
| 774 | } catch (readError) { |
| 775 | if (TimeoutError.is(readError)) { |
| 776 | const timeoutError = new MongoOperationTimeoutError( |
| 777 | `Timed out during socket read (${readError.duration}ms)` |
| 778 | ); |
| 779 | this.dataEvents = null; |
| 780 | this.onError(timeoutError); |
| 781 | throw timeoutError; |
| 782 | } else if (readError === options.signal?.reason) { |
| 783 | this.onError(readError); |
| 784 | } |
| 785 | throw readError; |
| 786 | } finally { |
| 787 | this.dataEvents = null; |
| 788 | this.messageStream.pause(); |
| 789 | } |
| 790 | } |
| 791 | } |
| 792 | |
| 793 | /** @internal */ |
no test coverage detected