MCPcopy
hub / github.com/benoitc/gunicorn / _run_lifespan

Method _run_lifespan

gunicorn/asgi/lifespan.py:124–149  ·  view source on GitHub ↗

Run the ASGI lifespan protocol.

(self, scope)

Source from the content-addressed store, hash-verified

122 self.logger.debug("ASGI lifespan shutdown complete")
123
124 async def _run_lifespan(self, scope):
125 """Run the ASGI lifespan protocol."""
126 try:
127 await self.app(scope, self._receive, self._send)
128 except asyncio.CancelledError:
129 raise
130 except Exception as e:
131 self.logger.debug("Lifespan application raised: %s", e)
132 # If startup hasn't completed, mark it as failed
133 if not self._startup_complete.is_set():
134 self._startup_failed = True
135 self._startup_error = str(e)
136 self._startup_complete.set()
137 # If shutdown hasn't completed, mark error
138 elif not self._shutdown_complete.is_set():
139 self._shutdown_error = str(e)
140 self._shutdown_complete.set()
141 finally:
142 self._app_finished = True
143 # Ensure events are set to unblock waiters
144 if not self._startup_complete.is_set():
145 self._startup_failed = True
146 self._startup_error = "Application exited before startup complete"
147 self._startup_complete.set()
148 if not self._shutdown_complete.is_set():
149 self._shutdown_complete.set()
150
151 async def _receive(self):
152 """ASGI receive callable for lifespan."""

Callers 1

startupMethod · 0.95

Calls 3

appMethod · 0.45
debugMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected