MCPcopy
hub / github.com/celery/celery / wait_until_idle

Method wait_until_idle

celery/contrib/testing/manager.py:211–231  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

209 return res
210
211 def wait_until_idle(self):
212 control = self.app.control
213 with self.app.connection() as connection:
214 # Try to purge the queue before we start
215 # to attempt to avoid interference from other tests
216 while True:
217 count = control.purge(connection=connection)
218 if count == 0:
219 break
220
221 # Wait until worker is idle
222 inspect = control.inspect()
223 inspect.connection = connection
224 while True:
225 try:
226 count = sum(len(t) for t in inspect.active().values())
227 except ContentDisallowed:
228 # test_security_task_done may trigger this exception
229 break
230 if count == 0:
231 break
232
233
234class Manager(ManagerMixin):

Callers 2

managerFunction · 0.80

Calls 4

connectionMethod · 0.45
purgeMethod · 0.45
inspectMethod · 0.45
activeMethod · 0.45

Tested by 2

managerFunction · 0.64