| 1126 | } |
| 1127 | |
| 1128 | private _readNext() { |
| 1129 | if (this._cursor.id === Long.ZERO) { |
| 1130 | this.push(null); |
| 1131 | return; |
| 1132 | } |
| 1133 | |
| 1134 | this._cursor |
| 1135 | .next() |
| 1136 | .then( |
| 1137 | // result from next() |
| 1138 | result => { |
| 1139 | if (result == null) { |
| 1140 | this.push(null); |
| 1141 | } else if (this.destroyed) { |
| 1142 | this._cursor.close().then(undefined, squashError); |
| 1143 | } else { |
| 1144 | if (this.push(result)) { |
| 1145 | return this._readNext(); |
| 1146 | } |
| 1147 | |
| 1148 | this._readInProgress = false; |
| 1149 | } |
| 1150 | }, |
| 1151 | // error from next() |
| 1152 | err => { |
| 1153 | // NOTE: This is questionable, but we have a test backing the behavior. It seems the |
| 1154 | // desired behavior is that a stream ends cleanly when a user explicitly closes |
| 1155 | // a client during iteration. Alternatively, we could do the "right" thing and |
| 1156 | // propagate the error message by removing this special case. |
| 1157 | if (err.message.match(/server is closed/)) { |
| 1158 | this._cursor.close().then(undefined, squashError); |
| 1159 | return this.push(null); |
| 1160 | } |
| 1161 | |
| 1162 | // NOTE: This is also perhaps questionable. The rationale here is that these errors tend |
| 1163 | // to be "operation was interrupted", where a cursor has been closed but there is an |
| 1164 | // active getMore in-flight. This used to check if the cursor was killed but once |
| 1165 | // that changed to happen in cleanup legitimate errors would not destroy the |
| 1166 | // stream. There are change streams test specifically test these cases. |
| 1167 | if (err.message.match(/operation was interrupted/)) { |
| 1168 | return this.push(null); |
| 1169 | } |
| 1170 | |
| 1171 | // NOTE: The two above checks on the message of the error will cause a null to be pushed |
| 1172 | // to the stream, thus closing the stream before the destroy call happens. This means |
| 1173 | // that either of those error messages on a change stream will not get a proper |
| 1174 | // 'error' event to be emitted (the error passed to destroy). Change stream resumability |
| 1175 | // relies on that error event to be emitted to create its new cursor and thus was not |
| 1176 | // working on 4.4 servers because the error emitted on failover was "interrupted at |
| 1177 | // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". |
| 1178 | // See NODE-4475. |
| 1179 | return this.destroy(err); |
| 1180 | } |
| 1181 | ) |
| 1182 | // if either of the above handlers throw |
| 1183 | .catch(error => { |
| 1184 | this._readInProgress = false; |
| 1185 | this.destroy(error); |