Body receiver for callback-based parsers. Body chunks are fed directly via the feed() method from parser callbacks. Uses Future-based waiting for efficient async receive().
| 160 | |
| 161 | |
| 162 | class BodyReceiver: |
| 163 | """Body receiver for callback-based parsers. |
| 164 | |
| 165 | Body chunks are fed directly via the feed() method from parser callbacks. |
| 166 | Uses Future-based waiting for efficient async receive(). |
| 167 | """ |
| 168 | |
| 169 | __slots__ = ('_chunks', '_complete', '_body_finished', '_closed', |
| 170 | '_body_wait_expired', '_waiter', 'request', 'protocol') |
| 171 | |
| 172 | def __init__(self, request, protocol): |
| 173 | self.request = request |
| 174 | self.protocol = protocol |
| 175 | self._chunks = [] |
| 176 | self._complete = False |
| 177 | self._body_finished = False # True after returning more_body=False |
| 178 | # _closed means the client transport has gone away (signal_disconnect |
| 179 | # was called or the protocol detected a disconnect). _body_wait_expired |
| 180 | # means the body did not finish framing within the configured timeout |
| 181 | # but the transport itself may still be open. Both surface as |
| 182 | # http.disconnect to the app, but they are distinct conditions. |
| 183 | self._closed = False |
| 184 | self._body_wait_expired = False |
| 185 | self._waiter = None |
| 186 | |
| 187 | def feed(self, chunk): |
| 188 | """Feed a body chunk directly (called by parser callback).""" |
| 189 | if chunk: |
| 190 | self._chunks.append(chunk) |
| 191 | self._wake_waiter() |
| 192 | |
| 193 | def set_complete(self): |
| 194 | """Mark body as complete (called when message ends).""" |
| 195 | self._complete = True |
| 196 | self._wake_waiter() |
| 197 | |
| 198 | def signal_disconnect(self): |
| 199 | """Signal that the client transport has gone away.""" |
| 200 | self._closed = True |
| 201 | self._wake_waiter() |
| 202 | |
| 203 | @property |
| 204 | def _disconnected(self): |
| 205 | """True when the receiver should yield http.disconnect to the app.""" |
| 206 | return self._closed or self._body_wait_expired |
| 207 | |
| 208 | def _wake_waiter(self): |
| 209 | """Wake up any pending receive() call.""" |
| 210 | if self._waiter is not None and not self._waiter.done(): |
| 211 | self._waiter.set_result(None) |
| 212 | |
| 213 | async def receive(self): # pylint: disable=too-many-return-statements |
| 214 | """ASGI receive callable - returns body chunks or disconnect.""" |
| 215 | # Already disconnected (transport closed or body wait timed out) |
| 216 | if self._disconnected: |
| 217 | return {"type": "http.disconnect"} |
| 218 | |
| 219 | # Body finished but not disconnected - wait for actual disconnect |
no outgoing calls