MCPcopy
hub / github.com/encode/uvicorn / LifespanOn

Class LifespanOn

uvicorn/lifespan/on.py:31–137  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

29
30
31class LifespanOn:
32 def __init__(self, config: Config) -> None:
33 if not config.loaded:
34 config.load()
35
36 self.config = config
37 self.logger = logging.getLogger("uvicorn.error")
38 self.startup_event = asyncio.Event()
39 self.shutdown_event = asyncio.Event()
40 self.receive_queue: Queue[LifespanReceiveMessage] = asyncio.Queue()
41 self.error_occurred = False
42 self.startup_failed = False
43 self.shutdown_failed = False
44 self.should_exit = False
45 self.state: dict[str, Any] = {}
46
47 async def startup(self) -> None:
48 self.logger.info("Waiting for application startup.")
49
50 loop = asyncio.get_event_loop()
51 main_lifespan_task = loop.create_task(self.main()) # noqa: F841
52 # Keep a hard reference to prevent garbage collection
53 # See https://github.com/Kludex/uvicorn/pull/972
54 startup_event: LifespanStartupEvent = {"type": "lifespan.startup"}
55 await self.receive_queue.put(startup_event)
56 await self.startup_event.wait()
57
58 if self.startup_failed or (self.error_occurred and self.config.lifespan == "on"):
59 self.logger.error("Application startup failed. Exiting.")
60 self.should_exit = True
61 else:
62 self.logger.info("Application startup complete.")
63
64 async def shutdown(self) -> None:
65 if self.error_occurred:
66 return
67 self.logger.info("Waiting for application shutdown.")
68 shutdown_event: LifespanShutdownEvent = {"type": "lifespan.shutdown"}
69 await self.receive_queue.put(shutdown_event)
70 await self.shutdown_event.wait()
71
72 if self.shutdown_failed or (self.error_occurred and self.config.lifespan == "on"):
73 self.logger.error("Application shutdown failed. Exiting.")
74 self.should_exit = True
75 else:
76 self.logger.info("Application shutdown complete.")
77
78 async def main(self) -> None:
79 try:
80 app = self.config.loaded_app
81 scope: LifespanScope = {
82 "type": "lifespan",
83 "asgi": {"version": self.config.asgi_version, "spec_version": "2.0"},
84 "state": self.state,
85 }
86 await app(scope, self.receive, self.send)
87 except BaseException as exc:
88 self.asgi = None

Callers 2

testFunction · 0.90
test_lifespan_stateFunction · 0.90

Calls

no outgoing calls

Tested by 2

testFunction · 0.72
test_lifespan_stateFunction · 0.72