| 294 | |
| 295 | |
| 296 | class FileResponse(Response): |
| 297 | chunk_size = 64 * 1024 |
| 298 | |
| 299 | def __init__( |
| 300 | self, |
| 301 | path: str | os.PathLike[str], |
| 302 | status_code: int = 200, |
| 303 | headers: Mapping[str, str] | None = None, |
| 304 | media_type: str | None = None, |
| 305 | background: BackgroundTask | None = None, |
| 306 | filename: str | None = None, |
| 307 | stat_result: os.stat_result | None = None, |
| 308 | content_disposition_type: str = "attachment", |
| 309 | ) -> None: |
| 310 | self.path = path |
| 311 | self.status_code = status_code |
| 312 | self.filename = filename |
| 313 | if media_type is None: |
| 314 | media_type = guess_type(filename or path)[0] or "application/octet-stream" |
| 315 | self.media_type = media_type |
| 316 | self.background = background |
| 317 | self.init_headers(headers) |
| 318 | self.headers.setdefault("accept-ranges", "bytes") |
| 319 | if self.filename is not None: |
| 320 | content_disposition_filename = quote(self.filename) |
| 321 | if content_disposition_filename != self.filename: |
| 322 | content_disposition = f"{content_disposition_type}; filename*=utf-8''{content_disposition_filename}" |
| 323 | else: |
| 324 | content_disposition = f'{content_disposition_type}; filename="{self.filename}"' |
| 325 | self.headers.setdefault("content-disposition", content_disposition) |
| 326 | self.stat_result = stat_result |
| 327 | if stat_result is not None: |
| 328 | self.set_stat_headers(stat_result) |
| 329 | |
| 330 | def set_stat_headers(self, stat_result: os.stat_result) -> None: |
| 331 | content_length = str(stat_result.st_size) |
| 332 | last_modified = formatdate(stat_result.st_mtime, usegmt=True) |
| 333 | etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size) |
| 334 | etag = f'"{hashlib.md5(etag_base.encode(), usedforsecurity=False).hexdigest()}"' |
| 335 | |
| 336 | self.headers.setdefault("content-length", content_length) |
| 337 | self.headers.setdefault("last-modified", last_modified) |
| 338 | self.headers.setdefault("etag", etag) |
| 339 | |
| 340 | async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: |
| 341 | scope_type = scope["type"] |
| 342 | send_header_only = scope_type == "http" and scope["method"].upper() == "HEAD" |
| 343 | send_pathsend = scope_type == "http" and "http.response.pathsend" in scope.get("extensions", {}) |
| 344 | if scope_type == "websocket": |
| 345 | send = self._wrap_websocket_denial_send(send) |
| 346 | |
| 347 | if self.stat_result is None: |
| 348 | try: |
| 349 | stat_result = await anyio.to_thread.run_sync(os.stat, self.path) |
| 350 | self.set_stat_headers(stat_result) |
| 351 | except FileNotFoundError: |
| 352 | raise RuntimeError(f"File at path {self.path} does not exist.") |
| 353 | else: |
no outgoing calls