MCPcopy
hub / github.com/celery/celery / bindings

Function bindings

celery/bin/list.py:21–38  ·  view source on GitHub ↗

Inspect queue bindings.

(ctx)

Source from the content-addressed store, hash-verified

19@list_.command(cls=CeleryCommand)
20@click.pass_context
21def bindings(ctx):
22 """Inspect queue bindings."""
23 # TODO: Consider using a table formatter for this command.
24 app = ctx.obj.app
25 with app.connection() as conn:
26 app.amqp.TaskConsumer(conn).declare()
27
28 try:
29 bindings = conn.manager.get_bindings()
30 except NotImplementedError:
31 raise click.UsageError('Your transport cannot list bindings.')
32
33 def fmt(q, e, r):
34 ctx.obj.echo(f'{q:<28} {e:<28} {r}')
35 fmt('Queue', 'Exchange', 'Routing Key')
36 fmt('-' * 16, '-' * 16, '-' * 16)
37 for b in bindings:
38 fmt(b['destination'], b['source'], b['routing_key'])

Callers

nothing calls this directly

Calls 3

fmtFunction · 0.85
TaskConsumerMethod · 0.80
connectionMethod · 0.45

Tested by

no test coverage detected