MCPcopy
hub / github.com/fastapi/fastapi / _serialize_sse_item

Function _serialize_sse_item

fastapi/routing.py:516–543  ·  view source on GitHub ↗
(item: Any)

Source from the content-addressed store, hash-verified

514 gen = dependant.call(**solved_result.values)
515
516 def _serialize_sse_item(item: Any) -> bytes:
517 if isinstance(item, ServerSentEvent):
518 # User controls the event structure.
519 # Serialize the data payload if present.
520 # For ServerSentEvent items we skip stream_item_field
521 # validation (the user may mix types intentionally).
522 if item.raw_data is not None:
523 data_str: str | None = item.raw_data
524 elif item.data is not None:
525 if hasattr(item.data, "model_dump_json"):
526 data_str = item.data.model_dump_json()
527 else:
528 data_str = json.dumps(jsonable_encoder(item.data))
529 else:
530 data_str = None
531 return format_sse_event(
532 data_str=data_str,
533 event=item.event,
534 id=item.id,
535 retry=item.retry,
536 comment=item.comment,
537 )
538 else:
539 # Plain object: validate + serialize via
540 # stream_item_field (if set) and wrap in data field
541 return format_sse_event(
542 data_str=_serialize_data(item).decode("utf-8")
543 )
544
545 if dependant.is_async_gen_callable:
546 sse_aiter: AsyncIterator[Any] = gen.__aiter__()

Callers 1

_producerFunction · 0.85

Calls 4

jsonable_encoderFunction · 0.90
format_sse_eventFunction · 0.90
_serialize_dataFunction · 0.85
dumpsMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…