* @internal * * @param parent - The parent object that created this change stream * @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
(
parent: OperationParent,
pipeline: Document[] = [],
options: ChangeStreamOptions = {}
)
| 639 | * @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents |
| 640 | */ |
| 641 | constructor( |
| 642 | parent: OperationParent, |
| 643 | pipeline: Document[] = [], |
| 644 | options: ChangeStreamOptions = {} |
| 645 | ) { |
| 646 | super(); |
| 647 | |
| 648 | this.pipeline = pipeline; |
| 649 | this.options = { ...options }; |
| 650 | let serverSelectionTimeoutMS: number; |
| 651 | delete this.options.writeConcern; |
| 652 | |
| 653 | if (parent instanceof Collection) { |
| 654 | this.type = CHANGE_DOMAIN_TYPES.COLLECTION; |
| 655 | serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS; |
| 656 | } else if (parent instanceof Db) { |
| 657 | this.type = CHANGE_DOMAIN_TYPES.DATABASE; |
| 658 | serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS; |
| 659 | } else if (parent instanceof MongoClient) { |
| 660 | this.type = CHANGE_DOMAIN_TYPES.CLUSTER; |
| 661 | serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS; |
| 662 | } else { |
| 663 | throw new MongoChangeStreamError( |
| 664 | 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient' |
| 665 | ); |
| 666 | } |
| 667 | |
| 668 | this.contextOwner = Symbol(); |
| 669 | this.parent = parent; |
| 670 | this.namespace = parent.s.namespace; |
| 671 | if (!this.options.readPreference && parent.readPreference) { |
| 672 | this.options.readPreference = parent.readPreference; |
| 673 | } |
| 674 | |
| 675 | // Create contained Change Stream cursor |
| 676 | this.cursor = this._createChangeStreamCursor(options); |
| 677 | |
| 678 | this.isClosed = false; |
| 679 | this.mode = false; |
| 680 | |
| 681 | // Listen for any `change` listeners being added to ChangeStream |
| 682 | this.on('newListener', eventName => { |
| 683 | if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { |
| 684 | this._streamEvents(this.cursor); |
| 685 | } |
| 686 | }); |
| 687 | |
| 688 | this.on('removeListener', eventName => { |
| 689 | if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) { |
| 690 | this.cursorStream?.removeAllListeners('data'); |
| 691 | } |
| 692 | }); |
| 693 | |
| 694 | if (this.options.timeoutMS != null) { |
| 695 | this.timeoutContext = new CSOTTimeoutContext({ |
| 696 | timeoutMS: this.options.timeoutMS, |
| 697 | serverSelectionTimeoutMS |
| 698 | }); |
nothing calls this directly
no test coverage detected