@internal
(cursor: ChangeStreamCursor<TSchema, TChange>)
| 956 | |
| 957 | /** @internal */ |
| 958 | private _streamEvents(cursor: ChangeStreamCursor<TSchema, TChange>): void { |
| 959 | this._setIsEmitter(); |
| 960 | const stream = this.cursorStream ?? cursor.stream(); |
| 961 | this.cursorStream = stream; |
| 962 | stream.on('data', change => { |
| 963 | try { |
| 964 | const processedChange = this._processChange(change); |
| 965 | this.emit(ChangeStream.CHANGE, processedChange); |
| 966 | } catch (error) { |
| 967 | this.emit(ChangeStream.ERROR, error); |
| 968 | } |
| 969 | this.timeoutContext?.refresh(); |
| 970 | }); |
| 971 | stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null)); |
| 972 | } |
| 973 | |
| 974 | /** @internal */ |
| 975 | private _endStream(): void { |
no test coverage detected