(sync: bool, client: Anthropic, async_client: AsyncAnthropic)
| 15 | @pytest.mark.asyncio |
| 16 | @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) |
| 17 | async def test_basic(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None: |
| 18 | def body() -> Iterator[bytes]: |
| 19 | yield b"event: completion\n" |
| 20 | yield b'data: {"foo":true}\n' |
| 21 | yield b"\n" |
| 22 | |
| 23 | iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) |
| 24 | |
| 25 | sse = await iter_next(iterator) |
| 26 | assert sse.event == "completion" |
| 27 | assert sse.json() == {"foo": True} |
| 28 | |
| 29 | await assert_empty_iter(iterator) |
| 30 | |
| 31 | |
| 32 | @pytest.mark.asyncio |
nothing calls this directly
no test coverage detected