MCPcopy
hub / github.com/celery/celery / test_expired

Method test_expired

t/integration/test_tasks.py:169–193  ·  view source on GitHub ↗

Testing expiration of task.

(self, manager)

Source from the content-addressed store, hash-verified

167 @pytest.mark.timeout(60)
168 @flaky
169 def test_expired(self, manager):
170 """Testing expiration of task."""
171 # Fill the queue with tasks which took > 1 sec to process
172 for _ in range(4):
173 sleeping.delay(2)
174 # Execute task with expiration = 1 sec
175 result = add.apply_async((1, 1), expires=1)
176 with pytest.raises(celery.exceptions.TaskRevokedError):
177 result.get()
178 assert result.status == 'REVOKED'
179 assert result.ready() is True
180 assert result.failed() is False
181 assert result.successful() is False
182
183 # Fill the queue with tasks which took > 1 sec to process
184 for _ in range(4):
185 sleeping.delay(2)
186 # Execute task with expiration at now + 1 sec
187 result = add.apply_async((1, 1), expires=datetime.now(timezone.utc) + timedelta(seconds=1))
188 with pytest.raises(celery.exceptions.TaskRevokedError):
189 result.get()
190 assert result.status == 'REVOKED'
191 assert result.ready() is True
192 assert result.failed() is False
193 assert result.successful() is False
194
195 @flaky
196 def test_eta(self, manager):

Callers

nothing calls this directly

Calls 8

delayMethod · 0.45
apply_asyncMethod · 0.45
raisesMethod · 0.45
getMethod · 0.45
readyMethod · 0.45
failedMethod · 0.45
successfulMethod · 0.45
nowMethod · 0.45

Tested by

no test coverage detected