Find tasks by filtering them and move the tasks to a new queue. Arguments: predicate (Callable): Filter function used to decide the messages to move. Must accept the standard signature of ``(body, message)`` used by Kombu consumer callbacks. If the predicate wa
(predicate, connection=None, exchange=None, routing_key=None,
source=None, app=None, callback=None, limit=None, transform=None,
**kwargs)
| 125 | |
| 126 | |
| 127 | def move(predicate, connection=None, exchange=None, routing_key=None, |
| 128 | source=None, app=None, callback=None, limit=None, transform=None, |
| 129 | **kwargs): |
| 130 | """Find tasks by filtering them and move the tasks to a new queue. |
| 131 | |
| 132 | Arguments: |
| 133 | predicate (Callable): Filter function used to decide the messages |
| 134 | to move. Must accept the standard signature of ``(body, message)`` |
| 135 | used by Kombu consumer callbacks. If the predicate wants the |
| 136 | message to be moved it must return either: |
| 137 | |
| 138 | 1) a tuple of ``(exchange, routing_key)``, or |
| 139 | |
| 140 | 2) a :class:`~kombu.entity.Queue` instance, or |
| 141 | |
| 142 | 3) any other true value means the specified |
| 143 | ``exchange`` and ``routing_key`` arguments will be used. |
| 144 | connection (kombu.Connection): Custom connection to use. |
| 145 | source: List[Union[str, kombu.Queue]]: Optional list of source |
| 146 | queues to use instead of the default (queues |
| 147 | in :setting:`task_queues`). This list can also contain |
| 148 | :class:`~kombu.entity.Queue` instances. |
| 149 | exchange (str, kombu.Exchange): Default destination exchange. |
| 150 | routing_key (str): Default destination routing key. |
| 151 | limit (int): Limit number of messages to filter. |
| 152 | callback (Callable): Callback called after message moved, |
| 153 | with signature ``(state, body, message)``. |
| 154 | transform (Callable): Optional function to transform the return |
| 155 | value (destination) of the filter function. |
| 156 | |
| 157 | Also supports the same keyword arguments as :func:`start_filter`. |
| 158 | |
| 159 | To demonstrate, the :func:`move_task_by_id` operation can be implemented |
| 160 | like this: |
| 161 | |
| 162 | .. code-block:: python |
| 163 | |
| 164 | def is_wanted_task(body, message): |
| 165 | if body['id'] == wanted_id: |
| 166 | return Queue('foo', exchange=Exchange('foo'), |
| 167 | routing_key='foo') |
| 168 | |
| 169 | move(is_wanted_task) |
| 170 | |
| 171 | or with a transform: |
| 172 | |
| 173 | .. code-block:: python |
| 174 | |
| 175 | def transform(value): |
| 176 | if isinstance(value, str): |
| 177 | return Queue(value, Exchange(value), value) |
| 178 | return value |
| 179 | |
| 180 | move(is_wanted_task, transform=transform) |
| 181 | |
| 182 | Note: |
| 183 | The predicate may also return a tuple of ``(exchange, routing_key)`` |
| 184 | to specify the destination to where the task should be moved, |