MCPcopy
hub / github.com/encode/uvicorn / send

Method send

uvicorn/lifespan/on.py:102–134  ·  view source on GitHub ↗
(self, message: LifespanSendMessage)

Source from the content-addressed store, hash-verified

100 self.shutdown_event.set()
101
102 async def send(self, message: LifespanSendMessage) -> None:
103 assert message["type"] in (
104 "lifespan.startup.complete",
105 "lifespan.startup.failed",
106 "lifespan.shutdown.complete",
107 "lifespan.shutdown.failed",
108 )
109
110 if message["type"] == "lifespan.startup.complete":
111 assert not self.startup_event.is_set(), STATE_TRANSITION_ERROR
112 assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
113 self.startup_event.set()
114
115 elif message["type"] == "lifespan.startup.failed":
116 assert not self.startup_event.is_set(), STATE_TRANSITION_ERROR
117 assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
118 self.startup_event.set()
119 self.startup_failed = True
120 if message.get("message"):
121 self.logger.error(message["message"])
122
123 elif message["type"] == "lifespan.shutdown.complete":
124 assert self.startup_event.is_set(), STATE_TRANSITION_ERROR
125 assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
126 self.shutdown_event.set()
127
128 elif message["type"] == "lifespan.shutdown.failed":
129 assert self.startup_event.is_set(), STATE_TRANSITION_ERROR
130 assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
131 self.shutdown_event.set()
132 self.shutdown_failed = True
133 if message.get("message"):
134 self.logger.error(message["message"])
135
136 async def receive(self) -> LifespanReceiveMessage:
137 return await self.receive_queue.get()

Callers 9

pingMethod · 0.45
pongMethod · 0.45
websocket_connectMethod · 0.45
websocket_receiveMethod · 0.45
send_textFunction · 0.45
test_app_closeFunction · 0.45
websocket_sessionFunction · 0.45

Calls

no outgoing calls