(
instance: any,
source$: Observable<T>,
)
| 159 | } |
| 160 | |
| 161 | public mergeDisconnectEvent<T = any>( |
| 162 | instance: any, |
| 163 | source$: Observable<T>, |
| 164 | ): Observable<T> { |
| 165 | const eventToError = (eventType: string) => |
| 166 | fromEvent(instance, eventType).pipe( |
| 167 | map((err: unknown) => { |
| 168 | throw err; |
| 169 | }), |
| 170 | ); |
| 171 | const disconnect$ = eventToError(RmqEventsMap.DISCONNECT); |
| 172 | |
| 173 | const urls = this.getOptionsProp(this.options, 'urls', []); |
| 174 | const connectFailedEventKey = 'connectFailed'; |
| 175 | const connectFailed$ = eventToError(connectFailedEventKey).pipe( |
| 176 | retryWhen(e => |
| 177 | e.pipe( |
| 178 | scan((errorCount, error: any) => { |
| 179 | if (urls.indexOf(error.url) >= urls.length - 1) { |
| 180 | throw error; |
| 181 | } |
| 182 | return errorCount + 1; |
| 183 | }, 0), |
| 184 | ), |
| 185 | ), |
| 186 | ); |
| 187 | // If we ever decide to propagate all disconnect errors & re-emit them through |
| 188 | // the "connection" stream then comment out "first()" operator. |
| 189 | return merge(source$, disconnect$, connectFailed$).pipe(first()); |
| 190 | } |
| 191 | |
| 192 | public async convertConnectionToPromise() { |
| 193 | try { |
no test coverage detected