| 11 | |
| 12 | @Controller() |
| 13 | export class RedisController { |
| 14 | static IS_NOTIFIED = false; |
| 15 | |
| 16 | @Client({ transport: Transport.REDIS }) |
| 17 | client: ClientProxy; |
| 18 | |
| 19 | @Post() |
| 20 | @HttpCode(200) |
| 21 | call(@Query('command') cmd, @Body() data: number[]): Observable<number> { |
| 22 | return this.client.send<number>({ cmd }, data); |
| 23 | } |
| 24 | |
| 25 | @Post('stream') |
| 26 | @HttpCode(200) |
| 27 | stream(@Body() data: number[]): Observable<number> { |
| 28 | return this.client |
| 29 | .send<number>({ cmd: 'streaming' }, data) |
| 30 | .pipe(scan((a, b) => a + b)); |
| 31 | } |
| 32 | |
| 33 | @Post('concurrent') |
| 34 | @HttpCode(200) |
| 35 | concurrent(@Body() data: number[][]): Promise<boolean> { |
| 36 | const send = async (tab: number[]) => { |
| 37 | const expected = tab.reduce((a, b) => a + b); |
| 38 | const result = await lastValueFrom( |
| 39 | this.client.send<number>({ cmd: 'sum' }, tab), |
| 40 | ); |
| 41 | |
| 42 | return result === expected; |
| 43 | }; |
| 44 | return data |
| 45 | .map(async tab => send(tab)) |
| 46 | .reduce(async (a, b) => (await a) && b); |
| 47 | } |
| 48 | |
| 49 | @MessagePattern({ cmd: 'sum' }) |
| 50 | sum(data: number[]): number { |
| 51 | return (data || []).reduce((a, b) => a + b); |
| 52 | } |
| 53 | |
| 54 | @MessagePattern({ cmd: 'asyncSum' }) |
| 55 | async asyncSum(data: number[]): Promise<number> { |
| 56 | return (data || []).reduce((a, b) => a + b); |
| 57 | } |
| 58 | |
| 59 | @MessagePattern({ cmd: 'streamSum' }) |
| 60 | streamSum(data: number[]): Observable<number> { |
| 61 | return of((data || []).reduce((a, b) => a + b)); |
| 62 | } |
| 63 | |
| 64 | @MessagePattern({ cmd: 'streaming' }) |
| 65 | streaming(data: number[]): Observable<number> { |
| 66 | return from(data); |
| 67 | } |
| 68 | |
| 69 | @Post('notify') |
| 70 | async sendNotification(): Promise<any> { |
nothing calls this directly
no test coverage detected