MCPcopy
hub / github.com/benoitc/gunicorn / LifespanManager

Class LifespanManager

gunicorn/asgi/lifespan.py:16–178  ·  view source on GitHub ↗

Manages ASGI lifespan events (startup/shutdown). The lifespan protocol allows ASGI applications to run code at startup and shutdown. This is essential for applications that need to initialize database connections, caches, or other resources. ASGI lifespan messages: - Server

Source from the content-addressed store, hash-verified

14
15
16class LifespanManager:
17 """Manages ASGI lifespan events (startup/shutdown).
18
19 The lifespan protocol allows ASGI applications to run code at
20 startup and shutdown. This is essential for applications that
21 need to initialize database connections, caches, or other
22 resources.
23
24 ASGI lifespan messages:
25 - Server sends: {"type": "lifespan.startup"}
26 - App responds: {"type": "lifespan.startup.complete"} or
27 {"type": "lifespan.startup.failed", "message": "..."}
28 - Server sends: {"type": "lifespan.shutdown"}
29 - App responds: {"type": "lifespan.shutdown.complete"}
30 """
31
32 def __init__(self, app, logger, state=None):
33 """Initialize the lifespan manager.
34
35 Args:
36 app: ASGI application callable
37 logger: Logger instance
38 state: Shared state dict for the application
39 """
40 self.app = app
41 self.logger = logger
42 self.state = state if state is not None else {}
43
44 self._startup_complete = asyncio.Event()
45 self._shutdown_complete = asyncio.Event()
46 self._startup_failed = False
47 self._startup_error = None
48 self._shutdown_error = None
49 self._receive_queue = asyncio.Queue()
50 self._task = None
51 self._app_finished = False
52
53 async def startup(self):
54 """Run lifespan startup and wait for completion.
55
56 Raises:
57 RuntimeError: If startup fails or app doesn't support lifespan
58 """
59 scope = {
60 "type": "lifespan",
61 "asgi": {"version": "3.0", "spec_version": "2.4"},
62 "state": self.state,
63 }
64
65 # Send startup event
66 await self._receive_queue.put({"type": "lifespan.startup"})
67
68 # Run lifespan in background task
69 self._task = asyncio.create_task(self._run_lifespan(scope))
70
71 # Wait for startup with timeout
72 try:
73 await asyncio.wait_for(

Callers 10

_serveMethod · 0.90
_create_managerMethod · 0.90
_create_managerMethod · 0.90
_create_managerMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected