(
sync: bool,
client: OpenAI,
async_client: AsyncOpenAI,
)
| 164 | |
| 165 | @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) |
| 166 | async def test_special_new_line_character( |
| 167 | sync: bool, |
| 168 | client: OpenAI, |
| 169 | async_client: AsyncOpenAI, |
| 170 | ) -> None: |
| 171 | def body() -> Iterator[bytes]: |
| 172 | yield b'data: {"content":" culpa"}\n' |
| 173 | yield b"\n" |
| 174 | yield b'data: {"content":" \xe2\x80\xa8"}\n' |
| 175 | yield b"\n" |
| 176 | yield b'data: {"content":"foo"}\n' |
| 177 | yield b"\n" |
| 178 | |
| 179 | iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) |
| 180 | |
| 181 | sse = await iter_next(iterator) |
| 182 | assert sse.event is None |
| 183 | assert sse.json() == {"content": " culpa"} |
| 184 | |
| 185 | sse = await iter_next(iterator) |
| 186 | assert sse.event is None |
| 187 | assert sse.json() == {"content": " |
| 188 | "} |
| 189 | |
| 190 | sse = await iter_next(iterator) |
| 191 | assert sse.event is None |
| 192 | assert sse.json() == {"content": "foo"} |
| 193 | |
| 194 | await assert_empty_iter(iterator) |
| 195 | |
| 196 |
nothing calls this directly
no test coverage detected