Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. .. versionchanged:: 6.3 No longer a static method.
(self, request: httputil.HTTPServerRequest)
| 205 | self._log(status_code, request) |
| 206 | |
| 207 | def environ(self, request: httputil.HTTPServerRequest) -> Dict[str, Any]: |
| 208 | """Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. |
| 209 | |
| 210 | .. versionchanged:: 6.3 |
| 211 | No longer a static method. |
| 212 | """ |
| 213 | hostport = request.host.split(":") |
| 214 | if len(hostport) == 2: |
| 215 | host = hostport[0] |
| 216 | port = int(hostport[1]) |
| 217 | else: |
| 218 | host = request.host |
| 219 | port = 443 if request.protocol == "https" else 80 |
| 220 | environ = { |
| 221 | "REQUEST_METHOD": request.method, |
| 222 | "SCRIPT_NAME": "", |
| 223 | "PATH_INFO": to_wsgi_str( |
| 224 | escape.url_unescape(request.path, encoding=None, plus=False) |
| 225 | ), |
| 226 | "QUERY_STRING": request.query, |
| 227 | "REMOTE_ADDR": request.remote_ip, |
| 228 | "SERVER_NAME": host, |
| 229 | "SERVER_PORT": str(port), |
| 230 | "SERVER_PROTOCOL": request.version, |
| 231 | "wsgi.version": (1, 0), |
| 232 | "wsgi.url_scheme": request.protocol, |
| 233 | "wsgi.input": BytesIO(escape.utf8(request.body)), |
| 234 | "wsgi.errors": sys.stderr, |
| 235 | "wsgi.multithread": self.executor is not dummy_executor, |
| 236 | "wsgi.multiprocess": True, |
| 237 | "wsgi.run_once": False, |
| 238 | } |
| 239 | if "Content-Type" in request.headers: |
| 240 | environ["CONTENT_TYPE"] = request.headers.pop("Content-Type") |
| 241 | if "Content-Length" in request.headers: |
| 242 | environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length") |
| 243 | for key, value in request.headers.items(): |
| 244 | environ["HTTP_" + key.replace("-", "_").upper()] = value |
| 245 | return environ |
| 246 | |
| 247 | def _log(self, status_code: int, request: httputil.HTTPServerRequest) -> None: |
| 248 | if status_code < 400: |
no test coverage detected