(self)
| 667 | assert job._already_cancelled is True |
| 668 | |
| 669 | def test_revoked_expires_expired(self): |
| 670 | job = self.get_request(self.mytask.s(1, f='x').set( |
| 671 | expires=datetime.now(timezone.utc) - timedelta(days=1) |
| 672 | )) |
| 673 | with self.assert_signal_called( |
| 674 | task_revoked, sender=job.task, request=job._context, |
| 675 | terminated=False, expired=True, signum=None): |
| 676 | job.revoked() |
| 677 | assert job.id in revoked |
| 678 | self.app.set_current() |
| 679 | assert self.mytask.backend.get_status(job.id) == states.REVOKED |
| 680 | |
| 681 | def test_revoked_expires_not_expired(self): |
| 682 | job = self.xRequest( |
nothing calls this directly
no test coverage detected