* Create a new change stream cursor based on self's configuration * @internal
(
options: ChangeStreamOptions | ChangeStreamCursorOptions
)
| 900 | * @internal |
| 901 | */ |
| 902 | private _createChangeStreamCursor( |
| 903 | options: ChangeStreamOptions | ChangeStreamCursorOptions |
| 904 | ): ChangeStreamCursor<TSchema, TChange> { |
| 905 | const changeStreamStageOptions: Document = filterOutOptions(options); |
| 906 | if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { |
| 907 | changeStreamStageOptions.allChangesForCluster = true; |
| 908 | } |
| 909 | const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline]; |
| 910 | |
| 911 | const client: MongoClient | null = |
| 912 | this.type === CHANGE_DOMAIN_TYPES.CLUSTER |
| 913 | ? (this.parent as MongoClient) |
| 914 | : this.type === CHANGE_DOMAIN_TYPES.DATABASE |
| 915 | ? (this.parent as Db).client |
| 916 | : this.type === CHANGE_DOMAIN_TYPES.COLLECTION |
| 917 | ? (this.parent as Collection).client |
| 918 | : null; |
| 919 | |
| 920 | if (client == null) { |
| 921 | // This should never happen because of the assertion in the constructor |
| 922 | throw new MongoRuntimeError( |
| 923 | `Changestream type should only be one of cluster, database, collection. Found ${this.type.toString()}` |
| 924 | ); |
| 925 | } |
| 926 | |
| 927 | const changeStreamCursor = new ChangeStreamCursor<TSchema, TChange>( |
| 928 | client, |
| 929 | this.namespace, |
| 930 | pipeline, |
| 931 | { |
| 932 | ...options, |
| 933 | timeoutContext: this.timeoutContext |
| 934 | ? new CursorTimeoutContext(this.timeoutContext, this.contextOwner) |
| 935 | : undefined |
| 936 | } |
| 937 | ); |
| 938 | |
| 939 | for (const event of CHANGE_STREAM_EVENTS) { |
| 940 | changeStreamCursor.on(event, e => this.emit(event, e)); |
| 941 | } |
| 942 | |
| 943 | if (this.listenerCount(ChangeStream.CHANGE) > 0) { |
| 944 | this._streamEvents(changeStreamCursor); |
| 945 | } |
| 946 | |
| 947 | return changeStreamCursor; |
| 948 | } |
| 949 | |
| 950 | /** @internal */ |
| 951 | private _closeEmitterModeWithError(error: AnyError): void { |
no test coverage detected