MCPcopy
hub / github.com/fastapi/fastapi / get_request_handler

Function get_request_handler

fastapi/routing.py:367–745  ·  view source on GitHub ↗
(
    dependant: Dependant,
    body_field: ModelField | None = None,
    status_code: int | None = None,
    response_class: type[Response] | DefaultPlaceholder = Default(JSONResponse),
    response_field: ModelField | None = None,
    response_model_include: IncEx | None = None,
    response_model_exclude: IncEx | None = None,
    response_model_by_alias: bool = True,
    response_model_exclude_unset: bool = False,
    response_model_exclude_defaults: bool = False,
    response_model_exclude_none: bool = False,
    dependency_overrides_provider: Any | None = None,
    embed_body_fields: bool = False,
    strict_content_type: bool | DefaultPlaceholder = Default(True),
    stream_item_field: ModelField | None = None,
    is_json_stream: bool = False,
)

Source from the content-addressed store, hash-verified

365
366
367def get_request_handler(
368 dependant: Dependant,
369 body_field: ModelField | None = None,
370 status_code: int | None = None,
371 response_class: type[Response] | DefaultPlaceholder = Default(JSONResponse),
372 response_field: ModelField | None = None,
373 response_model_include: IncEx | None = None,
374 response_model_exclude: IncEx | None = None,
375 response_model_by_alias: bool = True,
376 response_model_exclude_unset: bool = False,
377 response_model_exclude_defaults: bool = False,
378 response_model_exclude_none: bool = False,
379 dependency_overrides_provider: Any | None = None,
380 embed_body_fields: bool = False,
381 strict_content_type: bool | DefaultPlaceholder = Default(True),
382 stream_item_field: ModelField | None = None,
383 is_json_stream: bool = False,
384) -> Callable[[Request], Coroutine[Any, Any, Response]]:
385 assert dependant.call is not None, "dependant.call must be a function"
386 is_coroutine = dependant.is_coroutine_callable
387 is_body_form = body_field and isinstance(body_field.field_info, params.Form)
388 if isinstance(response_class, DefaultPlaceholder):
389 actual_response_class: type[Response] = response_class.value
390 else:
391 actual_response_class = response_class
392 is_sse_stream = lenient_issubclass(actual_response_class, EventSourceResponse)
393 if isinstance(strict_content_type, DefaultPlaceholder):
394 actual_strict_content_type: bool = strict_content_type.value
395 else:
396 actual_strict_content_type = strict_content_type
397
398 async def app(request: Request) -> Response:
399 response: Response | None = None
400 file_stack = request.scope.get("fastapi_middleware_astack")
401 assert isinstance(file_stack, AsyncExitStack), (
402 "fastapi_middleware_astack not found in request scope"
403 )
404
405 # Extract endpoint context for error messages
406 endpoint_ctx = (
407 _extract_endpoint_context(dependant.call)
408 if dependant.call
409 else EndpointContext()
410 )
411
412 if dependant.path:
413 # For mounted sub-apps, include the mount path prefix
414 mount_path = request.scope.get("root_path", "").rstrip("/")
415 endpoint_ctx["path"] = f"{request.method} {mount_path}{dependant.path}"
416
417 # Read body and auto-close files
418 try:
419 body: Any = None
420 if body_field:
421 if is_body_form:
422 body = await request.form()
423 file_stack.push_async_callback(body.close)
424 else:

Callers 1

get_route_handlerMethod · 0.85

Calls 2

DefaultFunction · 0.90
lenient_issubclassFunction · 0.90

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…