MCPcopy
hub / github.com/encode/starlette / _run

Method _run

starlette/testclient.py:133–148  ·  starlette/testclient.py::WebSocketTestSession._run

The sub-thread in which the websocket session runs.

(self, *, task_status: anyio.abc.TaskStatus[anyio.CancelScope])

Source from the content-addressed store, hash-verified

131 return self.exit_stack.__exit__(*args)
132
133 async def _run(self, *, task_status: anyio.abc.TaskStatus[anyio.CancelScope]) -> None:
134 class="st">"""
135 The sub-thread in which the websocket session runs.
136 class="st">"""
137 send: anyio.create_memory_object_stream[Message] = anyio.create_memory_object_stream(math.inf)
138 send_tx, send_rx = send
139 receive: anyio.create_memory_object_stream[Message] = anyio.create_memory_object_stream(math.inf)
140 receive_tx, receive_rx = receive
141 with send_tx, send_rx, receive_tx, receive_rx, anyio.CancelScope() as cs:
142 self._receive_tx = receive_tx
143 self._send_rx = send_rx
144 task_status.started(cs)
145 await self.app(self.scope, receive_rx.receive, send_tx.send)
146
147 class="cm"># wait for cs.cancel to be called before closing streams
148 await anyio.sleep_forever()
149
150 def _raise_on_close(self, message: Message) -> None:
151 if message[class="st">"type"] == class="st">"websocket.close":

Callers

nothing calls this directly

Calls 1

appMethod · 0.45

Tested by

no test coverage detected