Read the next message from the stream.
(self)
| 562 | _TIMEOUT_THRESHOLD = 5.0 |
| 563 | |
| 564 | async def _read_next_chunk(self): |
| 565 | """Read the next message from the stream.""" |
| 566 | # Calculate remaining time until deadline |
| 567 | now = time.monotonic() |
| 568 | |
| 569 | # Check total stream deadline |
| 570 | if now >= self._deadline: |
| 571 | self._exhausted = True |
| 572 | raise DirtyTimeoutError( |
| 573 | "Stream exceeded total timeout", |
| 574 | timeout=self.client.timeout |
| 575 | ) |
| 576 | |
| 577 | remaining = self._deadline - now |
| 578 | |
| 579 | try: |
| 580 | # Fast path: skip timeout wrapper when we have plenty of time |
| 581 | # This avoids asyncio.wait_for() overhead for most chunks |
| 582 | if remaining > self._TIMEOUT_THRESHOLD: |
| 583 | response = await DirtyProtocol.read_message_async( |
| 584 | self.client._reader |
| 585 | ) |
| 586 | else: |
| 587 | # Near deadline: apply timeout protection |
| 588 | read_timeout = min(remaining, self._idle_timeout) |
| 589 | response = await asyncio.wait_for( |
| 590 | DirtyProtocol.read_message_async(self.client._reader), |
| 591 | timeout=read_timeout |
| 592 | ) |
| 593 | except asyncio.TimeoutError: |
| 594 | self._exhausted = True |
| 595 | now = time.monotonic() |
| 596 | if now >= self._deadline: |
| 597 | raise DirtyTimeoutError( |
| 598 | "Stream exceeded total timeout", |
| 599 | timeout=self.client.timeout |
| 600 | ) |
| 601 | idle_duration = now - self._last_chunk_time |
| 602 | raise DirtyTimeoutError( |
| 603 | f"Timeout waiting for next chunk (idle {idle_duration:.1f}s)", |
| 604 | timeout=self._idle_timeout |
| 605 | ) |
| 606 | except Exception as e: |
| 607 | self._exhausted = True |
| 608 | await self.client._close_async() |
| 609 | raise DirtyConnectionError(f"Communication error: {e}") from e |
| 610 | |
| 611 | # Update last chunk time for idle tracking |
| 612 | self._last_chunk_time = time.monotonic() |
| 613 | |
| 614 | msg_type = response.get("type") |
| 615 | |
| 616 | # Chunk message - return the data |
| 617 | if msg_type == DirtyProtocol.MSG_TYPE_CHUNK: |
| 618 | return response.get("data") |
| 619 | |
| 620 | # End message - stop iteration |
| 621 | if msg_type == DirtyProtocol.MSG_TYPE_END: |
no test coverage detected