MCPcopy
hub / github.com/fastapi/fastapi / websocket_session

Function websocket_session

fastapi/routing.py:157–178  ·  view source on GitHub ↗

Takes a coroutine `func(session)`, and returns an ASGI application.

(
    func: Callable[[WebSocket], Awaitable[None]],
)

Source from the content-addressed store, hash-verified

155# Copy of starlette.routing.websocket_session modified to include the
156# dependencies' AsyncExitStack
157def websocket_session(
158 func: Callable[[WebSocket], Awaitable[None]],
159) -> ASGIApp:
160 """
161 Takes a coroutine `func(session)`, and returns an ASGI application.
162 """
163 # assert asyncio.iscoroutinefunction(func), "WebSocket endpoints must be async"
164
165 async def app(scope: Scope, receive: Receive, send: Send) -> None:
166 session = WebSocket(scope, receive=receive, send=send)
167
168 async def app(scope: Scope, receive: Receive, send: Send) -> None:
169 async with AsyncExitStack() as request_stack:
170 scope["fastapi_inner_astack"] = request_stack
171 async with AsyncExitStack() as function_stack:
172 scope["fastapi_function_astack"] = function_stack
173 await func(session)
174
175 # Same as in Starlette
176 await wrap_app_handling_exceptions(app, session)(scope, receive, send)
177
178 return app
179
180
181_T = TypeVar("_T")

Callers 1

__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…