MCPcopy
hub / github.com/celery/celery / test_revoke_from_resultset

Method test_revoke_from_resultset

t/unit/app/test_control.py:538–547  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

536 terminate=False, timeout=None)
537
538 def test_revoke_from_resultset(self):
539 self.app.control.revoke = Mock(name='revoke')
540 uuids = [uuid() for _ in range(10)]
541 r = self.app.GroupResult(
542 uuid(), [self.app.AsyncResult(x) for x in uuids])
543 r.revoke()
544 self.app.control.revoke.assert_called_with(
545 uuids,
546 connection=None, reply=False, signal=None,
547 terminate=False, timeout=None)
548
549 def test_after_fork_clears_mailbox_pool(self):
550 amqp = Mock(name='amqp')

Callers

nothing calls this directly

Calls 3

GroupResultMethod · 0.80
AsyncResultMethod · 0.45
revokeMethod · 0.45

Tested by

no test coverage detected