MCPcopy
hub / github.com/psycopg/psycopg / test_copy_out_iter

Function test_copy_out_iter

tests/test_copy_async.py:55–68  ·  view source on GitHub ↗
(aconn, format, row_factory)

Source from the content-addressed store, hash-verified

53@pytest.mark.parametrize("format", pq.Format)
54@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
55async def test_copy_out_iter(aconn, format, row_factory):
56 if format == pq.Format.TEXT:
57 want = [row + b"\n" for row in sample_text.splitlines()]
58 else:
59 want = sample_binary_rows
60
61 rf = getattr(psycopg.rows, row_factory)
62 cur = aconn.cursor(row_factory=rf)
63 async with cur.copy(
64 f"copy ({sample_values}) to stdout (format {format.name})"
65 ) as copy:
66 assert await alist(copy) == want
67
68 assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
69
70
71@pytest.mark.parametrize("format", pq.Format)

Callers

nothing calls this directly

Calls 3

alistFunction · 0.85
cursorMethod · 0.45
copyMethod · 0.45

Tested by

no test coverage detected