Inspect queue bindings.
(ctx)
| 19 | @list_.command(cls=CeleryCommand) |
| 20 | @click.pass_context |
| 21 | def bindings(ctx): |
| 22 | """Inspect queue bindings.""" |
| 23 | # TODO: Consider using a table formatter for this command. |
| 24 | app = ctx.obj.app |
| 25 | with app.connection() as conn: |
| 26 | app.amqp.TaskConsumer(conn).declare() |
| 27 | |
| 28 | try: |
| 29 | bindings = conn.manager.get_bindings() |
| 30 | except NotImplementedError: |
| 31 | raise click.UsageError('Your transport cannot list bindings.') |
| 32 | |
| 33 | def fmt(q, e, r): |
| 34 | ctx.obj.echo(f'{q:<28} {e:<28} {r}') |
| 35 | fmt('Queue', 'Exchange', 'Routing Key') |
| 36 | fmt('-' * 16, '-' * 16, '-' * 16) |
| 37 | for b in bindings: |
| 38 | fmt(b['destination'], b['source'], b['routing_key']) |
nothing calls this directly
no test coverage detected