Task simulating multiple retries. When return_value is provided, the task after retries returns the result. Otherwise it fails.
(self, return_value=None)
| 212 | |
| 213 | @shared_task(bind=True, default_retry_delay=1) |
| 214 | def retry(self, return_value=None): |
| 215 | """Task simulating multiple retries. |
| 216 | |
| 217 | When return_value is provided, the task after retries returns |
| 218 | the result. Otherwise it fails. |
| 219 | """ |
| 220 | if return_value: |
| 221 | attempt = getattr(self, 'attempt', 0) |
| 222 | print('attempt', attempt) |
| 223 | if attempt >= 3: |
| 224 | delattr(self, 'attempt') |
| 225 | return return_value |
| 226 | self.attempt = attempt + 1 |
| 227 | |
| 228 | raise self.retry(exc=ExpectedException(), countdown=5) |
| 229 | |
| 230 | |
| 231 | @shared_task(bind=True, default_retry_delay=1) |
nothing calls this directly
no test coverage detected