MCPcopy Index your code
hub / github.com/fastapi/fastapi / get_websocket_app

Function get_websocket_app

fastapi/routing.py:748–782  ·  view source on GitHub ↗
(
    dependant: Dependant,
    dependency_overrides_provider: Any | None = None,
    embed_body_fields: bool = False,
)

Source from the content-addressed store, hash-verified

746
747
748def get_websocket_app(
749 dependant: Dependant,
750 dependency_overrides_provider: Any | None = None,
751 embed_body_fields: bool = False,
752) -> Callable[[WebSocket], Coroutine[Any, Any, Any]]:
753 async def app(websocket: WebSocket) -> None:
754 endpoint_ctx = (
755 _extract_endpoint_context(dependant.call)
756 if dependant.call
757 else EndpointContext()
758 )
759 if dependant.path:
760 # For mounted sub-apps, include the mount path prefix
761 mount_path = websocket.scope.get("root_path", "").rstrip("/")
762 endpoint_ctx["path"] = f"WS {mount_path}{dependant.path}"
763 async_exit_stack = websocket.scope.get("fastapi_inner_astack")
764 assert isinstance(async_exit_stack, AsyncExitStack), (
765 "fastapi_inner_astack not found in request scope"
766 )
767 solved_result = await solve_dependencies(
768 request=websocket,
769 dependant=dependant,
770 dependency_overrides_provider=dependency_overrides_provider,
771 async_exit_stack=async_exit_stack,
772 embed_body_fields=embed_body_fields,
773 )
774 if solved_result.errors:
775 raise WebSocketRequestValidationError(
776 solved_result.errors,
777 endpoint_ctx=endpoint_ctx,
778 )
779 assert dependant.call is not None, "dependant.call must be a function"
780 await dependant.call(**solved_result.values)
781
782 return app
783
784
785class APIWebSocketRoute(routing.WebSocketRoute):

Callers 1

__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…