Send the COMMAND control command to the workers. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
(ctx, command, timeout, destination, json)
| 234 | @click.pass_context |
| 235 | @handle_preload_options |
| 236 | def control(ctx, command, timeout, destination, json): |
| 237 | """Send the COMMAND control command to the workers. |
| 238 | |
| 239 | Availability: RabbitMQ (AMQP), Redis, and MongoDB transports. |
| 240 | """ |
| 241 | _verify_command_name('control', command) |
| 242 | callback = None if json else partial(_say_remote_command_reply, ctx, |
| 243 | show_reply=True) |
| 244 | args = ctx.args |
| 245 | arguments = _compile_arguments(command, args) |
| 246 | try: |
| 247 | replies = ctx.obj.app.control.broadcast(command, timeout=timeout, |
| 248 | destination=destination, |
| 249 | callback=callback, |
| 250 | reply=True, |
| 251 | arguments=arguments) |
| 252 | except Exception as exc: |
| 253 | handle_remote_command_error(f'control {command}', exc) |
| 254 | |
| 255 | if not replies: |
| 256 | raise CeleryCommandException( |
| 257 | message='No nodes replied within time constraint', |
| 258 | exit_code=EX_UNAVAILABLE |
| 259 | ) |
| 260 | |
| 261 | if json: |
| 262 | ctx.obj.echo(dumps(replies)) |
nothing calls this directly
no outgoing calls
no test coverage detected