MCPcopy
hub / github.com/benoitc/gunicorn / DirtyAsyncStreamIterator

Class DirtyAsyncStreamIterator

gunicorn/dirty/client.py:492–638  ·  view source on GitHub ↗

Async iterator for streaming responses from dirty workers. This class is returned by `DirtyClient.stream_async()` and yields chunks from a streaming response until the end message is received. Uses a deadline-based timeout approach for efficiency: - Total stream timeout: limit

Source from the content-addressed store, hash-verified

490
491
492class DirtyAsyncStreamIterator:
493 """
494 Async iterator for streaming responses from dirty workers.
495
496 This class is returned by `DirtyClient.stream_async()` and yields chunks
497 from a streaming response until the end message is received.
498
499 Uses a deadline-based timeout approach for efficiency:
500 - Total stream timeout: limits entire stream duration
501 - Idle timeout: limits gap between chunks (defaults to total timeout)
502
503 This avoids the overhead of asyncio.wait_for() on every chunk read.
504 """
505
506 # Default idle timeout between chunks (seconds)
507 DEFAULT_IDLE_TIMEOUT = 30.0
508
509 def __init__(self, client, app_path, action, args, kwargs,
510 idle_timeout=None):
511 self.client = client
512 self.app_path = app_path
513 self.action = action
514 self.args = args
515 self.kwargs = kwargs
516 self._started = False
517 self._exhausted = False
518 self._request_id = None
519 self._deadline = None
520 self._last_chunk_time = None
521 # Idle timeout: max time between chunks
522 self._idle_timeout = (
523 idle_timeout if idle_timeout is not None
524 else min(self.DEFAULT_IDLE_TIMEOUT, client.timeout)
525 )
526
527 def __aiter__(self):
528 return self
529
530 async def __anext__(self):
531 if self._exhausted:
532 raise StopAsyncIteration
533
534 if not self._started:
535 await self._start_request()
536 self._started = True
537
538 return await self._read_next_chunk()
539
540 async def _start_request(self):
541 """Send the initial request to the arbiter."""
542 if self.client._writer is None:
543 await self.client.connect_async()
544
545 # Set deadline for entire stream
546 now = time.monotonic()
547 self._deadline = now + self.client.timeout
548 self._last_chunk_time = now
549

Calls

no outgoing calls

Tested by

no test coverage detected