(self)
| 1486 | self.app.conf.result_backend_always_retry = prev |
| 1487 | |
| 1488 | def test_get_with_retries(self): |
| 1489 | self.app.conf.result_backend_always_retry, prev = True, self.app.conf.result_backend_always_retry |
| 1490 | |
| 1491 | try: |
| 1492 | b = BaseBackend(app=self.app) |
| 1493 | b.exception_safe_to_retry = lambda exc: True |
| 1494 | b._sleep = Mock() |
| 1495 | b.on_backend_retryable_error = Mock() |
| 1496 | b._get_task_meta_for = Mock() |
| 1497 | b._get_task_meta_for.side_effect = [ |
| 1498 | Exception("failed"), |
| 1499 | {'status': states.SUCCESS, 'result': 42} |
| 1500 | ] |
| 1501 | res = b.get_task_meta(sentinel.task_id) |
| 1502 | assert res == {'status': states.SUCCESS, 'result': 42} |
| 1503 | assert b._sleep.call_count == 1 |
| 1504 | b.on_backend_retryable_error.assert_called_once() |
| 1505 | finally: |
| 1506 | self.app.conf.result_backend_always_retry = prev |
| 1507 | |
| 1508 | def test_get_reaching_max_retries(self): |
| 1509 | self.app.conf.result_backend_always_retry, prev = True, self.app.conf.result_backend_always_retry |
nothing calls this directly
no test coverage detected