Testing expiration of task.
(self, manager)
| 167 | @pytest.mark.timeout(60) |
| 168 | @flaky |
| 169 | def test_expired(self, manager): |
| 170 | """Testing expiration of task.""" |
| 171 | # Fill the queue with tasks which took > 1 sec to process |
| 172 | for _ in range(4): |
| 173 | sleeping.delay(2) |
| 174 | # Execute task with expiration = 1 sec |
| 175 | result = add.apply_async((1, 1), expires=1) |
| 176 | with pytest.raises(celery.exceptions.TaskRevokedError): |
| 177 | result.get() |
| 178 | assert result.status == 'REVOKED' |
| 179 | assert result.ready() is True |
| 180 | assert result.failed() is False |
| 181 | assert result.successful() is False |
| 182 | |
| 183 | # Fill the queue with tasks which took > 1 sec to process |
| 184 | for _ in range(4): |
| 185 | sleeping.delay(2) |
| 186 | # Execute task with expiration at now + 1 sec |
| 187 | result = add.apply_async((1, 1), expires=datetime.now(timezone.utc) + timedelta(seconds=1)) |
| 188 | with pytest.raises(celery.exceptions.TaskRevokedError): |
| 189 | result.get() |
| 190 | assert result.status == 'REVOKED' |
| 191 | assert result.ready() is True |
| 192 | assert result.failed() is False |
| 193 | assert result.successful() is False |
| 194 | |
| 195 | @flaky |
| 196 | def test_eta(self, manager): |
nothing calls this directly
no test coverage detected