MCPcopy
hub / github.com/celery/celery / test_revoked_expires_expired

Method test_revoked_expires_expired

t/unit/worker/test_request.py:669–679  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

667 assert job._already_cancelled is True
668
669 def test_revoked_expires_expired(self):
670 job = self.get_request(self.mytask.s(1, f='x').set(
671 expires=datetime.now(timezone.utc) - timedelta(days=1)
672 ))
673 with self.assert_signal_called(
674 task_revoked, sender=job.task, request=job._context,
675 terminated=False, expired=True, signum=None):
676 job.revoked()
677 assert job.id in revoked
678 self.app.set_current()
679 assert self.mytask.backend.get_status(job.id) == states.REVOKED
680
681 def test_revoked_expires_not_expired(self):
682 job = self.xRequest(

Callers

nothing calls this directly

Calls 6

get_requestMethod · 0.95
set_currentMethod · 0.80
setMethod · 0.45
sMethod · 0.45
nowMethod · 0.45
revokedMethod · 0.45

Tested by

no test coverage detected