Async iterator for streaming responses from dirty workers. This class is returned by `DirtyClient.stream_async()` and yields chunks from a streaming response until the end message is received. Uses a deadline-based timeout approach for efficiency: - Total stream timeout: limit
| 490 | |
| 491 | |
| 492 | class DirtyAsyncStreamIterator: |
| 493 | """ |
| 494 | Async iterator for streaming responses from dirty workers. |
| 495 | |
| 496 | This class is returned by `DirtyClient.stream_async()` and yields chunks |
| 497 | from a streaming response until the end message is received. |
| 498 | |
| 499 | Uses a deadline-based timeout approach for efficiency: |
| 500 | - Total stream timeout: limits entire stream duration |
| 501 | - Idle timeout: limits gap between chunks (defaults to total timeout) |
| 502 | |
| 503 | This avoids the overhead of asyncio.wait_for() on every chunk read. |
| 504 | """ |
| 505 | |
| 506 | # Default idle timeout between chunks (seconds) |
| 507 | DEFAULT_IDLE_TIMEOUT = 30.0 |
| 508 | |
| 509 | def __init__(self, client, app_path, action, args, kwargs, |
| 510 | idle_timeout=None): |
| 511 | self.client = client |
| 512 | self.app_path = app_path |
| 513 | self.action = action |
| 514 | self.args = args |
| 515 | self.kwargs = kwargs |
| 516 | self._started = False |
| 517 | self._exhausted = False |
| 518 | self._request_id = None |
| 519 | self._deadline = None |
| 520 | self._last_chunk_time = None |
| 521 | # Idle timeout: max time between chunks |
| 522 | self._idle_timeout = ( |
| 523 | idle_timeout if idle_timeout is not None |
| 524 | else min(self.DEFAULT_IDLE_TIMEOUT, client.timeout) |
| 525 | ) |
| 526 | |
| 527 | def __aiter__(self): |
| 528 | return self |
| 529 | |
| 530 | async def __anext__(self): |
| 531 | if self._exhausted: |
| 532 | raise StopAsyncIteration |
| 533 | |
| 534 | if not self._started: |
| 535 | await self._start_request() |
| 536 | self._started = True |
| 537 | |
| 538 | return await self._read_next_chunk() |
| 539 | |
| 540 | async def _start_request(self): |
| 541 | """Send the initial request to the arbiter.""" |
| 542 | if self.client._writer is None: |
| 543 | await self.client.connect_async() |
| 544 | |
| 545 | # Set deadline for entire stream |
| 546 | now = time.monotonic() |
| 547 | self._deadline = now + self.client.timeout |
| 548 | self._last_chunk_time = now |
| 549 |
no outgoing calls
no test coverage detected