(
channel: string,
buffer: Buffer,
pub: MqttClient,
originalPacket?: Record<string, any>,
)
| 127 | } |
| 128 | |
| 129 | public async handleMessage( |
| 130 | channel: string, |
| 131 | buffer: Buffer, |
| 132 | pub: MqttClient, |
| 133 | originalPacket?: Record<string, any>, |
| 134 | ): Promise<any> { |
| 135 | const rawPacket = this.parseMessage(buffer.toString()); |
| 136 | const packet = await this.deserializer.deserialize(rawPacket, { channel }); |
| 137 | const mqttContext = new MqttContext([channel, originalPacket!]); |
| 138 | if (isUndefined((packet as IncomingRequest).id)) { |
| 139 | return this.handleEvent(channel, packet, mqttContext); |
| 140 | } |
| 141 | const publish = this.getPublisher( |
| 142 | pub, |
| 143 | mqttContext, |
| 144 | (packet as IncomingRequest).id, |
| 145 | ); |
| 146 | const handler = this.getHandlerByPattern(channel); |
| 147 | |
| 148 | if (!handler) { |
| 149 | const status = 'error'; |
| 150 | const noHandlerPacket = { |
| 151 | id: (packet as IncomingRequest).id, |
| 152 | status, |
| 153 | err: NO_MESSAGE_HANDLER, |
| 154 | }; |
| 155 | return publish(noHandlerPacket); |
| 156 | } |
| 157 | return this.onProcessingStartHook( |
| 158 | this.transportId, |
| 159 | mqttContext, |
| 160 | async () => { |
| 161 | const response$ = this.transformToObservable( |
| 162 | await handler(packet.data, mqttContext), |
| 163 | ); |
| 164 | response$ && this.send(response$, publish); |
| 165 | }, |
| 166 | ); |
| 167 | } |
| 168 | |
| 169 | public getPublisher( |
| 170 | client: MqttClient, |
no test coverage detected