MCPcopy
hub / github.com/celery/celery / test_get_with_retries

Method test_get_with_retries

t/unit/backends/test_base.py:1488–1506  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1486 self.app.conf.result_backend_always_retry = prev
1487
1488 def test_get_with_retries(self):
1489 self.app.conf.result_backend_always_retry, prev = True, self.app.conf.result_backend_always_retry
1490
1491 try:
1492 b = BaseBackend(app=self.app)
1493 b.exception_safe_to_retry = lambda exc: True
1494 b._sleep = Mock()
1495 b.on_backend_retryable_error = Mock()
1496 b._get_task_meta_for = Mock()
1497 b._get_task_meta_for.side_effect = [
1498 Exception("failed"),
1499 {'status': states.SUCCESS, 'result': 42}
1500 ]
1501 res = b.get_task_meta(sentinel.task_id)
1502 assert res == {'status': states.SUCCESS, 'result': 42}
1503 assert b._sleep.call_count == 1
1504 b.on_backend_retryable_error.assert_called_once()
1505 finally:
1506 self.app.conf.result_backend_always_retry = prev
1507
1508 def test_get_reaching_max_retries(self):
1509 self.app.conf.result_backend_always_retry, prev = True, self.app.conf.result_backend_always_retry

Callers

nothing calls this directly

Calls 2

BaseBackendClass · 0.90
get_task_metaMethod · 0.45

Tested by

no test coverage detected