* @internal * * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
(
command: WriteProtocolMessageType,
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeoutContext?: TimeoutContext;
} & Abortable
)
| 688 | * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). |
| 689 | */ |
| 690 | private async writeCommand( |
| 691 | command: WriteProtocolMessageType, |
| 692 | options: { |
| 693 | agreedCompressor?: CompressorName; |
| 694 | zlibCompressionLevel?: number; |
| 695 | timeoutContext?: TimeoutContext; |
| 696 | } & Abortable |
| 697 | ): Promise<void> { |
| 698 | const finalCommand = |
| 699 | options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) |
| 700 | ? command |
| 701 | : new OpCompressedRequest(command, { |
| 702 | agreedCompressor: options.agreedCompressor ?? 'none', |
| 703 | zlibCompressionLevel: options.zlibCompressionLevel ?? 0 |
| 704 | }); |
| 705 | |
| 706 | const buffer = ByteUtils.concat(await finalCommand.toBin()); |
| 707 | |
| 708 | if (options.timeoutContext?.csotEnabled()) { |
| 709 | if ( |
| 710 | options.timeoutContext.minRoundTripTime != null && |
| 711 | options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime |
| 712 | ) { |
| 713 | throw new MongoOperationTimeoutError( |
| 714 | 'Server roundtrip time is greater than the time remaining' |
| 715 | ); |
| 716 | } |
| 717 | } |
| 718 | |
| 719 | try { |
| 720 | if (this.socket.write(buffer)) return; |
| 721 | } catch (writeError) { |
| 722 | const networkError = new MongoNetworkError('unexpected error writing to socket', { |
| 723 | cause: writeError |
| 724 | }); |
| 725 | this.onError(networkError); |
| 726 | throw networkError; |
| 727 | } |
| 728 | |
| 729 | const drainEvent = once<void>(this.socket, 'drain', options); |
| 730 | const timeout = options?.timeoutContext?.timeoutForSocketWrite; |
| 731 | const drained = timeout ? Promise.race([drainEvent, timeout]) : drainEvent; |
| 732 | try { |
| 733 | return await drained; |
| 734 | } catch (writeError) { |
| 735 | if (TimeoutError.is(writeError)) { |
| 736 | const timeoutError = new MongoOperationTimeoutError('Timed out at socket write'); |
| 737 | this.onError(timeoutError); |
| 738 | throw timeoutError; |
| 739 | } else if (writeError === options.signal?.reason) { |
| 740 | this.onError(writeError); |
| 741 | } |
| 742 | throw writeError; |
| 743 | } finally { |
| 744 | timeout?.clear(); |
| 745 | } |
| 746 | } |
| 747 |
no test coverage detected