MCPcopy
hub / github.com/celery/celery / assert_routes_to_queue

Method assert_routes_to_queue

t/unit/app/test_routes.py:53–62  ·  view source on GitHub ↗
(self, queue, router, name,
                               args=None, kwargs=None, options=None)

Source from the content-addressed store, hash-verified

51 self.mytask = mytask
52
53 def assert_routes_to_queue(self, queue, router, name,
54 args=None, kwargs=None, options=None):
55 if options is None:
56 options = {}
57 if kwargs is None:
58 kwargs = {}
59 if args is None:
60 args = []
61 assert router.route(options, name, args, kwargs)[
62 'queue'].name == queue
63
64 def assert_routes_to_default_queue(self, router, name, *args, **kwargs):
65 self.assert_routes_to_queue(

Calls 1

routeMethod · 0.80

Tested by

no test coverage detected