Manages ASGI lifespan events (startup/shutdown). The lifespan protocol allows ASGI applications to run code at startup and shutdown. This is essential for applications that need to initialize database connections, caches, or other resources. ASGI lifespan messages: - Server
| 14 | |
| 15 | |
| 16 | class LifespanManager: |
| 17 | """Manages ASGI lifespan events (startup/shutdown). |
| 18 | |
| 19 | The lifespan protocol allows ASGI applications to run code at |
| 20 | startup and shutdown. This is essential for applications that |
| 21 | need to initialize database connections, caches, or other |
| 22 | resources. |
| 23 | |
| 24 | ASGI lifespan messages: |
| 25 | - Server sends: {"type": "lifespan.startup"} |
| 26 | - App responds: {"type": "lifespan.startup.complete"} or |
| 27 | {"type": "lifespan.startup.failed", "message": "..."} |
| 28 | - Server sends: {"type": "lifespan.shutdown"} |
| 29 | - App responds: {"type": "lifespan.shutdown.complete"} |
| 30 | """ |
| 31 | |
| 32 | def __init__(self, app, logger, state=None): |
| 33 | """Initialize the lifespan manager. |
| 34 | |
| 35 | Args: |
| 36 | app: ASGI application callable |
| 37 | logger: Logger instance |
| 38 | state: Shared state dict for the application |
| 39 | """ |
| 40 | self.app = app |
| 41 | self.logger = logger |
| 42 | self.state = state if state is not None else {} |
| 43 | |
| 44 | self._startup_complete = asyncio.Event() |
| 45 | self._shutdown_complete = asyncio.Event() |
| 46 | self._startup_failed = False |
| 47 | self._startup_error = None |
| 48 | self._shutdown_error = None |
| 49 | self._receive_queue = asyncio.Queue() |
| 50 | self._task = None |
| 51 | self._app_finished = False |
| 52 | |
| 53 | async def startup(self): |
| 54 | """Run lifespan startup and wait for completion. |
| 55 | |
| 56 | Raises: |
| 57 | RuntimeError: If startup fails or app doesn't support lifespan |
| 58 | """ |
| 59 | scope = { |
| 60 | "type": "lifespan", |
| 61 | "asgi": {"version": "3.0", "spec_version": "2.4"}, |
| 62 | "state": self.state, |
| 63 | } |
| 64 | |
| 65 | # Send startup event |
| 66 | await self._receive_queue.put({"type": "lifespan.startup"}) |
| 67 | |
| 68 | # Run lifespan in background task |
| 69 | self._task = asyncio.create_task(self._run_lifespan(scope)) |
| 70 | |
| 71 | # Wait for startup with timeout |
| 72 | try: |
| 73 | await asyncio.wait_for( |
no outgoing calls
no test coverage detected