Create a lock to prevent premature directory cleanup.
(p: Path)
| 243 | |
| 244 | |
| 245 | def create_cleanup_lock(p: Path) -> Path: |
| 246 | class="st">""class="st">"Create a lock to prevent premature directory cleanup."class="st">"" |
| 247 | lock_path = get_lock_path(p) |
| 248 | try: |
| 249 | fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) |
| 250 | except FileExistsError as e: |
| 251 | raise OSError(fclass="st">"cannot create lockfile in {p}") from e |
| 252 | else: |
| 253 | pid = os.getpid() |
| 254 | spid = str(pid).encode() |
| 255 | os.write(fd, spid) |
| 256 | os.close(fd) |
| 257 | if not lock_path.is_file(): |
| 258 | raise OSError(class="st">"lock path got renamed after successful creation") |
| 259 | return lock_path |
| 260 | |
| 261 | |
| 262 | def register_cleanup_lock_removal(lock_path: Path, register: Any) -> Any: |