(self)
| 175 | self.handle_events() |
| 176 | |
| 177 | def handle_events(self) -> None: |
| 178 | while True: |
| 179 | try: |
| 180 | event = self.conn.next_event() |
| 181 | except h11.RemoteProtocolError: |
| 182 | msg = "Invalid HTTP request received." |
| 183 | self.logger.warning(msg) |
| 184 | self.send_400_response(msg) |
| 185 | return |
| 186 | |
| 187 | if event is h11.NEED_DATA: |
| 188 | break |
| 189 | |
| 190 | elif event is h11.PAUSED: |
| 191 | # This case can occur in HTTP pipelining, so we need to |
| 192 | # stop reading any more data, and ensure that at the end |
| 193 | # of the active request/response cycle we handle any |
| 194 | # events that have been buffered up. |
| 195 | self.flow.pause_reading() |
| 196 | break |
| 197 | |
| 198 | elif isinstance(event, h11.Request): |
| 199 | self.headers = [(key.lower(), value) for key, value in event.headers] |
| 200 | raw_path, _, query_string = event.target.partition(b"?") |
| 201 | path = unquote(raw_path.decode("ascii")) |
| 202 | full_path = self.root_path + path |
| 203 | full_raw_path = self.root_path.encode("ascii") + raw_path |
| 204 | self.scope = { |
| 205 | "type": "http", |
| 206 | "asgi": {"version": self.config.asgi_version, "spec_version": "2.3"}, |
| 207 | "http_version": event.http_version.decode("ascii"), |
| 208 | "server": self.server, |
| 209 | "client": self.client, |
| 210 | "scheme": self.scheme, # type: ignore[typeddict-item] |
| 211 | "method": event.method.decode("ascii"), |
| 212 | "root_path": self.root_path, |
| 213 | "path": full_path, |
| 214 | "raw_path": full_raw_path, |
| 215 | "query_string": query_string, |
| 216 | "headers": self.headers, |
| 217 | "state": self.app_state.copy(), |
| 218 | } |
| 219 | if self._should_upgrade(): |
| 220 | self.handle_websocket_upgrade(event) |
| 221 | return |
| 222 | |
| 223 | # Handle 503 responses when 'limit_concurrency' is exceeded. |
| 224 | if self.limit_concurrency is not None and ( |
| 225 | len(self.connections) >= self.limit_concurrency or len(self.tasks) >= self.limit_concurrency |
| 226 | ): |
| 227 | app = service_unavailable |
| 228 | message = "Exceeded concurrency limit." |
| 229 | self.logger.warning(message) |
| 230 | else: |
| 231 | app = self.app |
| 232 | |
| 233 | # When starting to process a request, disable the keep-alive |
| 234 | # timeout. Normally we disable this when receiving data from |
no test coverage detected