| 802 | } |
| 803 | |
| 804 | override _transform(chunk: Uint8Array, encoding: unknown, callback: TransformCallback): void { |
| 805 | if (this.connection.delayedTimeoutId != null) { |
| 806 | clearTimeout(this.connection.delayedTimeoutId); |
| 807 | this.connection.delayedTimeoutId = null; |
| 808 | } |
| 809 | |
| 810 | this.bufferPool.append(chunk); |
| 811 | |
| 812 | while (this.bufferPool.length) { |
| 813 | // While there are any bytes in the buffer |
| 814 | |
| 815 | // Try to fetch a size from the top 4 bytes |
| 816 | const sizeOfMessage = this.bufferPool.getInt32(); |
| 817 | |
| 818 | if (sizeOfMessage == null) { |
| 819 | // Not even an int32 worth of data. Stop the loop, we need more chunks. |
| 820 | break; |
| 821 | } |
| 822 | |
| 823 | if (sizeOfMessage < 0) { |
| 824 | // The size in the message has a negative value, this is probably corruption, throw: |
| 825 | return callback(new MongoParseError(`Message size cannot be negative: ${sizeOfMessage}`)); |
| 826 | } |
| 827 | |
| 828 | if (sizeOfMessage > this.bufferPool.length) { |
| 829 | // We do not have enough bytes to make a sizeOfMessage chunk |
| 830 | break; |
| 831 | } |
| 832 | |
| 833 | // Add a message to the stream |
| 834 | const message = this.bufferPool.read(sizeOfMessage); |
| 835 | |
| 836 | if (!this.push(message)) { |
| 837 | // We only subscribe to data events so we should never get backpressure |
| 838 | // if we do, we do not have the handling for it. |
| 839 | return callback( |
| 840 | new MongoRuntimeError(`SizedMessageTransform does not support backpressure`) |
| 841 | ); |
| 842 | } |
| 843 | } |
| 844 | |
| 845 | callback(); |
| 846 | } |
| 847 | } |
| 848 | |
| 849 | /** @internal */ |