Build SSE wire-format bytes from **pre-serialized** data. The result always ends with `\n\n` (the event terminator).
(
*,
data_str: Annotated[
str | None,
Doc(
"""
Pre-serialized data string to use as the `data:` field.
"""
),
] = None,
event: Annotated[
str | None,
Doc(
"""
Optional event type name (`event:` field).
"""
),
] = None,
id: Annotated[
str | None,
Doc(
"""
Optional event ID (`id:` field).
"""
),
] = None,
retry: Annotated[
int | None,
Doc(
"""
Optional reconnection time in milliseconds (`retry:` field).
"""
),
] = None,
comment: Annotated[
str | None,
Doc(
"""
Optional comment line(s) (`:` prefix).
"""
),
] = None,
)
| 157 | |
| 158 | |
| 159 | def format_sse_event( |
| 160 | *, |
| 161 | data_str: Annotated[ |
| 162 | str | None, |
| 163 | Doc( |
| 164 | """ |
| 165 | Pre-serialized data string to use as the `data:` field. |
| 166 | """ |
| 167 | ), |
| 168 | ] = None, |
| 169 | event: Annotated[ |
| 170 | str | None, |
| 171 | Doc( |
| 172 | """ |
| 173 | Optional event type name (`event:` field). |
| 174 | """ |
| 175 | ), |
| 176 | ] = None, |
| 177 | id: Annotated[ |
| 178 | str | None, |
| 179 | Doc( |
| 180 | """ |
| 181 | Optional event ID (`id:` field). |
| 182 | """ |
| 183 | ), |
| 184 | ] = None, |
| 185 | retry: Annotated[ |
| 186 | int | None, |
| 187 | Doc( |
| 188 | """ |
| 189 | Optional reconnection time in milliseconds (`retry:` field). |
| 190 | """ |
| 191 | ), |
| 192 | ] = None, |
| 193 | comment: Annotated[ |
| 194 | str | None, |
| 195 | Doc( |
| 196 | """ |
| 197 | Optional comment line(s) (`:` prefix). |
| 198 | """ |
| 199 | ), |
| 200 | ] = None, |
| 201 | ) -> bytes: |
| 202 | """Build SSE wire-format bytes from **pre-serialized** data. |
| 203 | |
| 204 | The result always ends with `\n\n` (the event terminator). |
| 205 | """ |
| 206 | lines: list[str] = [] |
| 207 | |
| 208 | if comment is not None: |
| 209 | for line in comment.splitlines(): |
| 210 | lines.append(f": {line}") |
| 211 | |
| 212 | if event is not None: |
| 213 | lines.append(f"event: {event}") |
| 214 | |
| 215 | if data_str is not None: |
| 216 | for line in data_str.splitlines(): |
no outgoing calls
no test coverage detected
searching dependent graphs…