(
callback?: (err?: unknown, ...optionalParams: unknown[]) => void,
)
| 112 | } |
| 113 | |
| 114 | public async start( |
| 115 | callback?: (err?: unknown, ...optionalParams: unknown[]) => void, |
| 116 | ) { |
| 117 | this.server = this.createClient(); |
| 118 | this.server!.once(RmqEventsMap.CONNECT, () => { |
| 119 | if (this.channel) { |
| 120 | return; |
| 121 | } |
| 122 | this._status$.next(RmqStatus.CONNECTED); |
| 123 | this.channel = this.server!.createChannel({ |
| 124 | json: false, |
| 125 | setup: (channel: Channel) => this.setupChannel(channel, callback!), |
| 126 | }); |
| 127 | }); |
| 128 | |
| 129 | const maxConnectionAttempts = this.getOptionsProp( |
| 130 | this.options, |
| 131 | 'maxConnectionAttempts', |
| 132 | INFINITE_CONNECTION_ATTEMPTS, |
| 133 | ); |
| 134 | |
| 135 | this.registerConnectListener(); |
| 136 | this.registerDisconnectListener(); |
| 137 | this.registerBlockedListener(); |
| 138 | this.registerUnblockedListener(); |
| 139 | this.pendingEventListeners.forEach(({ event, callback }) => |
| 140 | this.server!.on(event, callback), |
| 141 | ); |
| 142 | this.pendingEventListeners = []; |
| 143 | |
| 144 | const connectFailedEvent = 'connectFailed'; |
| 145 | this.server!.once( |
| 146 | connectFailedEvent, |
| 147 | async (error: Record<string, unknown>) => { |
| 148 | this._status$.next(RmqStatus.DISCONNECTED); |
| 149 | |
| 150 | this.logger.error(CONNECTION_FAILED_MESSAGE); |
| 151 | if (error?.err) { |
| 152 | this.logger.error(error.err); |
| 153 | } |
| 154 | const isReconnecting = !!this.channel; |
| 155 | if ( |
| 156 | maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS || |
| 157 | isReconnecting |
| 158 | ) { |
| 159 | return; |
| 160 | } |
| 161 | if (++this.connectionAttempts === maxConnectionAttempts) { |
| 162 | await this.close(); |
| 163 | callback?.(error.err ?? new Error(CONNECTION_FAILED_MESSAGE)); |
| 164 | } |
| 165 | }, |
| 166 | ); |
| 167 | } |
| 168 | |
| 169 | public createClient<T = any>(): T { |
| 170 | const socketOptions = this.getOptionsProp(this.options, 'socketOptions'); |
no test coverage detected