| 64 | self.create_missing = create_missing |
| 65 | |
| 66 | def route(self, options, name, args=(), kwargs=None, task_type=None): |
| 67 | kwargs = {} if not kwargs else kwargs |
| 68 | options = self.expand_destination(options) # expands 'queue' |
| 69 | if self.routes: |
| 70 | route = self.lookup_route(name, args, kwargs, options, task_type) |
| 71 | if route: # expands 'queue' in route. |
| 72 | return lpmerge(self.expand_destination(route), options) |
| 73 | if 'queue' not in options: |
| 74 | options = lpmerge(self.expand_destination( |
| 75 | self.app.conf.task_default_queue), options) |
| 76 | return options |
| 77 | |
| 78 | def expand_destination(self, route): |
| 79 | # Route can be a queue name: convenient for direct exchanges. |