MCPcopy
hub / github.com/nestjs/nest / mergeDisconnectEvent

Method mergeDisconnectEvent

packages/microservices/client/client-rmq.ts:161–190  ·  view source on GitHub ↗
(
    instance: any,
    source$: Observable<T>,
  )

Source from the content-addressed store, hash-verified

159 }
160
161 public mergeDisconnectEvent<T = any>(
162 instance: any,
163 source$: Observable<T>,
164 ): Observable<T> {
165 const eventToError = (eventType: string) =>
166 fromEvent(instance, eventType).pipe(
167 map((err: unknown) => {
168 throw err;
169 }),
170 );
171 const disconnect$ = eventToError(RmqEventsMap.DISCONNECT);
172
173 const urls = this.getOptionsProp(this.options, 'urls', []);
174 const connectFailedEventKey = 'connectFailed';
175 const connectFailed$ = eventToError(connectFailedEventKey).pipe(
176 retryWhen(e =>
177 e.pipe(
178 scan((errorCount, error: any) => {
179 if (urls.indexOf(error.url) >= urls.length - 1) {
180 throw error;
181 }
182 return errorCount + 1;
183 }, 0),
184 ),
185 ),
186 );
187 // If we ever decide to propagate all disconnect errors & re-emit them through
188 // the "connection" stream then comment out "first()" operator.
189 return merge(source$, disconnect$, connectFailed$).pipe(first());
190 }
191
192 public async convertConnectionToPromise() {
193 try {

Callers 2

connectMethod · 0.95
client-rmq.spec.tsFile · 0.80

Calls 1

pipeMethod · 0.45

Tested by

no test coverage detected