Create an error ASGI app for syntax errors during reload.
(self, error_msg)
| 87 | self.asgi = self._make_error_app(str(e)) |
| 88 | |
| 89 | def _make_error_app(self, error_msg): |
| 90 | """Create an error ASGI app for syntax errors during reload.""" |
| 91 | async def error_app(scope, receive, send): |
| 92 | if scope["type"] == "http": |
| 93 | await send({ |
| 94 | "type": "http.response.start", |
| 95 | "status": 500, |
| 96 | "headers": [(b"content-type", b"text/plain")], |
| 97 | }) |
| 98 | await send({ |
| 99 | "type": "http.response.body", |
| 100 | "body": f"Application error: {error_msg}".encode(), |
| 101 | }) |
| 102 | elif scope["type"] == "lifespan": |
| 103 | message = await receive() |
| 104 | if message["type"] == "lifespan.startup": |
| 105 | await send({"type": "lifespan.startup.complete"}) |
| 106 | message = await receive() |
| 107 | if message["type"] == "lifespan.shutdown": |
| 108 | await send({"type": "lifespan.shutdown.complete"}) |
| 109 | return error_app |
| 110 | |
| 111 | def init_signals(self): |
| 112 | """Initialize signal handlers for asyncio.""" |