Register a cleanup function for removing a lock.
(lock_path: Path, register: Any)
| 260 | |
| 261 | |
| 262 | def register_cleanup_lock_removal(lock_path: Path, register: Any) -> Any: |
| 263 | """Register a cleanup function for removing a lock.""" |
| 264 | pid = os.getpid() |
| 265 | |
| 266 | def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: |
| 267 | current_pid = os.getpid() |
| 268 | if current_pid != original_pid: |
| 269 | # fork |
| 270 | return |
| 271 | try: |
| 272 | lock_path.unlink() |
| 273 | except OSError: |
| 274 | pass |
| 275 | |
| 276 | return register(cleanup_on_exit) |
| 277 | |
| 278 | |
| 279 | def maybe_delete_a_numbered_dir(path: Path) -> None: |
no outgoing calls