Read status file. Raise BadStatus if the status file doesn't exist or contains invalid JSON or the JSON is not a dict.
(status_file: str)
| 356 | |
| 357 | |
| 358 | def read_status(status_file: str) -> dict[str, object]: |
| 359 | """Read status file. |
| 360 | |
| 361 | Raise BadStatus if the status file doesn't exist or contains |
| 362 | invalid JSON or the JSON is not a dict. |
| 363 | """ |
| 364 | if not os.path.isfile(status_file): |
| 365 | raise BadStatus("No status file found") |
| 366 | with open(status_file) as f: |
| 367 | try: |
| 368 | data = json.load(f) |
| 369 | except Exception as e: |
| 370 | raise BadStatus(f"Malformed status file: {str(e)}") from e |
| 371 | if not isinstance(data, dict): |
| 372 | raise BadStatus(f"Invalid status file (not a dict): {data}") |
| 373 | return data |
| 374 | |
| 375 | |
| 376 | def ready_to_read(conns: Sequence[IPCBase], timeout: float | None = None) -> list[int]: |
no test coverage detected
searching dependent graphs…