MCPcopy
hub / github.com/celery/celery / move

Function move

celery/contrib/migrate.py:127–215  ·  view source on GitHub ↗

Find tasks by filtering them and move the tasks to a new queue. Arguments: predicate (Callable): Filter function used to decide the messages to move. Must accept the standard signature of ``(body, message)`` used by Kombu consumer callbacks. If the predicate wa

(predicate, connection=None, exchange=None, routing_key=None,
         source=None, app=None, callback=None, limit=None, transform=None,
         **kwargs)

Source from the content-addressed store, hash-verified

125
126
127def move(predicate, connection=None, exchange=None, routing_key=None,
128 source=None, app=None, callback=None, limit=None, transform=None,
129 **kwargs):
130 """Find tasks by filtering them and move the tasks to a new queue.
131
132 Arguments:
133 predicate (Callable): Filter function used to decide the messages
134 to move. Must accept the standard signature of ``(body, message)``
135 used by Kombu consumer callbacks. If the predicate wants the
136 message to be moved it must return either:
137
138 1) a tuple of ``(exchange, routing_key)``, or
139
140 2) a :class:`~kombu.entity.Queue` instance, or
141
142 3) any other true value means the specified
143 ``exchange`` and ``routing_key`` arguments will be used.
144 connection (kombu.Connection): Custom connection to use.
145 source: List[Union[str, kombu.Queue]]: Optional list of source
146 queues to use instead of the default (queues
147 in :setting:`task_queues`). This list can also contain
148 :class:`~kombu.entity.Queue` instances.
149 exchange (str, kombu.Exchange): Default destination exchange.
150 routing_key (str): Default destination routing key.
151 limit (int): Limit number of messages to filter.
152 callback (Callable): Callback called after message moved,
153 with signature ``(state, body, message)``.
154 transform (Callable): Optional function to transform the return
155 value (destination) of the filter function.
156
157 Also supports the same keyword arguments as :func:`start_filter`.
158
159 To demonstrate, the :func:`move_task_by_id` operation can be implemented
160 like this:
161
162 .. code-block:: python
163
164 def is_wanted_task(body, message):
165 if body['id'] == wanted_id:
166 return Queue('foo', exchange=Exchange('foo'),
167 routing_key='foo')
168
169 move(is_wanted_task)
170
171 or with a transform:
172
173 .. code-block:: python
174
175 def transform(value):
176 if isinstance(value, str):
177 return Queue(value, Exchange(value), value)
178 return value
179
180 move(is_wanted_task, transform=transform)
181
182 Note:
183 The predicate may also return a tuple of ``(exchange, routing_key)``
184 to specify the destination to where the task should be moved,

Callers 3

move_contextMethod · 0.90
move_by_idmapFunction · 0.85
move_by_taskmapFunction · 0.85

Calls 4

_maybe_queueFunction · 0.85
start_filterFunction · 0.85
connection_or_acquireMethod · 0.80
StateClass · 0.70

Tested by 1

move_contextMethod · 0.72