MCPcopy
hub / github.com/mongodb/node-mongodb-native / constructor

Method constructor

src/change_stream.ts:641–700  ·  view source on GitHub ↗

* @internal * * @param parent - The parent object that created this change stream * @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents

(
    parent: OperationParent,
    pipeline: Document[] = [],
    options: ChangeStreamOptions = {}
  )

Source from the content-addressed store, hash-verified

639 * @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
640 */
641 constructor(
642 parent: OperationParent,
643 pipeline: Document[] = [],
644 options: ChangeStreamOptions = {}
645 ) {
646 super();
647
648 this.pipeline = pipeline;
649 this.options = { ...options };
650 let serverSelectionTimeoutMS: number;
651 delete this.options.writeConcern;
652
653 if (parent instanceof Collection) {
654 this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
655 serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
656 } else if (parent instanceof Db) {
657 this.type = CHANGE_DOMAIN_TYPES.DATABASE;
658 serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
659 } else if (parent instanceof MongoClient) {
660 this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
661 serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
662 } else {
663 throw new MongoChangeStreamError(
664 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
665 );
666 }
667
668 this.contextOwner = Symbol();
669 this.parent = parent;
670 this.namespace = parent.s.namespace;
671 if (!this.options.readPreference && parent.readPreference) {
672 this.options.readPreference = parent.readPreference;
673 }
674
675 // Create contained Change Stream cursor
676 this.cursor = this._createChangeStreamCursor(options);
677
678 this.isClosed = false;
679 this.mode = false;
680
681 // Listen for any `change` listeners being added to ChangeStream
682 this.on('newListener', eventName => {
683 if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
684 this._streamEvents(this.cursor);
685 }
686 });
687
688 this.on('removeListener', eventName => {
689 if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
690 this.cursorStream?.removeAllListeners('data');
691 }
692 });
693
694 if (this.options.timeoutMS != null) {
695 this.timeoutContext = new CSOTTimeoutContext({
696 timeoutMS: this.options.timeoutMS,
697 serverSelectionTimeoutMS
698 });

Callers

nothing calls this directly

Calls 5

_streamEventsMethod · 0.95
onMethod · 0.80
listenerCountMethod · 0.80
removeAllListenersMethod · 0.80

Tested by

no test coverage detected