Retry callback with exponential backoff when it raises OSError. If the function still generates an error after max_wait seconds, propagate the exception. This can be effective against random file system operation failures on Windows.
(func: Callable[[], Any], max_wait: float = 1.0)
| 330 | |
| 331 | |
| 332 | def retry_on_error(func: Callable[[], Any], max_wait: float = 1.0) -> None: |
| 333 | """Retry callback with exponential backoff when it raises OSError. |
| 334 | |
| 335 | If the function still generates an error after max_wait seconds, propagate |
| 336 | the exception. |
| 337 | |
| 338 | This can be effective against random file system operation failures on |
| 339 | Windows. |
| 340 | """ |
| 341 | t0 = time.time() |
| 342 | wait_time = 0.01 |
| 343 | while True: |
| 344 | try: |
| 345 | func() |
| 346 | return |
| 347 | except OSError: |
| 348 | wait_time = min(wait_time * 2, t0 + max_wait - time.time()) |
| 349 | if wait_time <= 0.01: |
| 350 | # Done enough waiting, the error seems persistent. |
| 351 | raise |
| 352 | time.sleep(wait_time) |
| 353 | |
| 354 | |
| 355 | def good_repr(obj: object) -> str: |
no test coverage detected
searching dependent graphs…