MCPcopy
hub / github.com/benoitc/gunicorn / startup

Method startup

gunicorn/asgi/lifespan.py:53–88  ·  view source on GitHub ↗

Run lifespan startup and wait for completion. Raises: RuntimeError: If startup fails or app doesn't support lifespan

(self)

Source from the content-addressed store, hash-verified

51 self._app_finished = False
52
53 async def startup(self):
54 """Run lifespan startup and wait for completion.
55
56 Raises:
57 RuntimeError: If startup fails or app doesn't support lifespan
58 """
59 scope = {
60 "type": "lifespan",
61 "asgi": {"version": "3.0", "spec_version": "2.4"},
62 "state": self.state,
63 }
64
65 # Send startup event
66 await self._receive_queue.put({"type": "lifespan.startup"})
67
68 # Run lifespan in background task
69 self._task = asyncio.create_task(self._run_lifespan(scope))
70
71 # Wait for startup with timeout
72 try:
73 await asyncio.wait_for(
74 self._startup_complete.wait(),
75 timeout=30.0 # Reasonable startup timeout
76 )
77 except asyncio.TimeoutError:
78 if self._task:
79 self._task.cancel()
80 raise RuntimeError("Lifespan startup timed out")
81
82 if self._startup_failed:
83 if self._task:
84 self._task.cancel()
85 msg = self._startup_error or "Unknown error"
86 raise RuntimeError(f"Lifespan startup failed: {msg}")
87
88 self.logger.debug("ASGI lifespan startup complete")
89
90 async def shutdown(self):
91 """Signal shutdown and wait for completion.

Calls 4

_run_lifespanMethod · 0.95
putMethod · 0.80
waitMethod · 0.45
debugMethod · 0.45