Receive a single message from a worker, with crash diagnostics.
(self, idx: int)
| 1387 | assert read_tag(buf) == ACK_MESSAGE |
| 1388 | |
| 1389 | def receive_worker_message(self, idx: int) -> ReadBuffer: |
| 1390 | """Receive a single message from a worker, with crash diagnostics.""" |
| 1391 | try: |
| 1392 | return receive(self.workers[idx].conn) |
| 1393 | except OSError as exc: |
| 1394 | try: |
| 1395 | # Give worker process a chance to actually terminate before reporting. |
| 1396 | exit_code = self.workers[idx].proc.wait(timeout=WORKER_SHUTDOWN_TIMEOUT) |
| 1397 | except TimeoutError: |
| 1398 | exit_code = None |
| 1399 | exit_status = f"exit code {exit_code}" if exit_code is not None else "still running" |
| 1400 | raise OSError( |
| 1401 | f"Worker {idx} disconnected before sending data ({exit_status})" |
| 1402 | ) from exc |
| 1403 | |
| 1404 | def submit(self, graph: Graph, sccs: list[SCC]) -> None: |
| 1405 | """Submit a stale SCC for processing in current process or parallel workers.""" |
no test coverage detected