Read the next message from the stream.
(self)
| 415 | DirtyProtocol.write_message(self.client._sock, request) |
| 416 | |
| 417 | def _read_next_chunk(self): |
| 418 | """Read the next message from the stream.""" |
| 419 | with self.client._lock: |
| 420 | # Check total stream deadline |
| 421 | now = time.monotonic() |
| 422 | if now >= self._deadline: |
| 423 | self._exhausted = True |
| 424 | raise DirtyTimeoutError( |
| 425 | "Stream exceeded total timeout", |
| 426 | timeout=self.client.timeout |
| 427 | ) |
| 428 | |
| 429 | remaining = self._deadline - now |
| 430 | |
| 431 | # Set socket timeout based on remaining time |
| 432 | # Fast path: use larger timeout when plenty of time remains |
| 433 | if remaining > self._TIMEOUT_THRESHOLD: |
| 434 | read_timeout = self._TIMEOUT_THRESHOLD |
| 435 | else: |
| 436 | read_timeout = min(remaining, self._idle_timeout) |
| 437 | |
| 438 | try: |
| 439 | self.client._sock.settimeout(read_timeout) |
| 440 | response = DirtyProtocol.read_message(self.client._sock) |
| 441 | except socket.timeout: |
| 442 | # Check which timeout was hit |
| 443 | now = time.monotonic() |
| 444 | if now >= self._deadline: |
| 445 | self._exhausted = True |
| 446 | raise DirtyTimeoutError( |
| 447 | "Stream exceeded total timeout", |
| 448 | timeout=self.client.timeout |
| 449 | ) |
| 450 | idle_duration = now - self._last_chunk_time |
| 451 | self._exhausted = True |
| 452 | raise DirtyTimeoutError( |
| 453 | f"Timeout waiting for next chunk (idle {idle_duration:.1f}s)", |
| 454 | timeout=self._idle_timeout |
| 455 | ) |
| 456 | except Exception as e: |
| 457 | self._exhausted = True |
| 458 | self.client._close_socket() |
| 459 | raise DirtyConnectionError(f"Communication error: {e}") from e |
| 460 | |
| 461 | # Update last chunk time for idle tracking |
| 462 | self._last_chunk_time = time.monotonic() |
| 463 | |
| 464 | msg_type = response.get("type") |
| 465 | |
| 466 | # Chunk message - return the data |
| 467 | if msg_type == DirtyProtocol.MSG_TYPE_CHUNK: |
| 468 | return response.get("data") |
| 469 | |
| 470 | # End message - stop iteration |
| 471 | if msg_type == DirtyProtocol.MSG_TYPE_END: |
| 472 | self._exhausted = True |
| 473 | raise StopIteration |
| 474 |
no test coverage detected