MCPcopy
hub / github.com/encode/uvicorn / FlowControl

Class FlowControl

uvicorn/protocols/http/flow_control.py:10–39  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

8
9
10class FlowControl:
11 def __init__(self, transport: asyncio.Transport) -> None:
12 self._transport = transport
13 self.read_paused = False
14 self.write_paused = False
15 self._is_writable_event = asyncio.Event()
16 self._is_writable_event.set()
17
18 async def drain(self) -> None:
19 await self._is_writable_event.wait() # pragma: full coverage
20
21 def pause_reading(self) -> None:
22 if not self.read_paused:
23 self.read_paused = True
24 self._transport.pause_reading()
25
26 def resume_reading(self) -> None:
27 if self.read_paused:
28 self.read_paused = False
29 self._transport.resume_reading()
30
31 def pause_writing(self) -> None:
32 if not self.write_paused: # pragma: full coverage
33 self.write_paused = True
34 self._is_writable_event.clear()
35
36 def resume_writing(self) -> None:
37 if self.write_paused: # pragma: full coverage
38 self.write_paused = False
39 self._is_writable_event.set()
40
41
42async def service_unavailable(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:

Callers 2

connection_madeMethod · 0.90
connection_madeMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected