(
self,
content: ContentStream,
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
)
| 223 | body_iterator: AsyncContentStream |
| 224 | |
| 225 | def __init__( |
| 226 | self, |
| 227 | content: ContentStream, |
| 228 | status_code: int = 200, |
| 229 | headers: Mapping[str, str] | None = None, |
| 230 | media_type: str | None = None, |
| 231 | background: BackgroundTask | None = None, |
| 232 | ) -> None: |
| 233 | if isinstance(content, AsyncIterable): |
| 234 | self.body_iterator = content |
| 235 | else: |
| 236 | self.body_iterator = iterate_in_threadpool(content) |
| 237 | self.status_code = status_code |
| 238 | self.media_type = self.media_type if media_type is None else media_type |
| 239 | self.background = background |
| 240 | self.init_headers(headers) |
| 241 | |
| 242 | async def listen_for_disconnect(self, receive: Receive) -> None: |
| 243 | while True: |
nothing calls this directly
no test coverage detected