MCPcopy
hub / github.com/psycopg/psycopg / test_copy_in_format

Function test_copy_in_format

tests/test_copy_async.py:589–609  ·  view source on GitHub ↗
(aconn)

Source from the content-addressed store, hash-verified

587
588
589async def test_copy_in_format(aconn):
590 file = BytesIO()
591 await aconn.execute("set client_encoding to utf8")
592 cur = aconn.cursor()
593 async with AsyncCopy(cur, writer=AsyncFileWriter(file)) as copy:
594 for i in range(1, 256):
595 await copy.write_row((i, chr(i)))
596
597 file.seek(0)
598 rows = file.read().split(b"\n")
599 assert not rows[-1]
600 del rows[-1]
601
602 for i, row in enumerate(rows, start=1):
603 fields = row.split(b"\t")
604 assert len(fields) == 2
605 assert int(fields[0].decode()) == i
606 if i in special_chars:
607 assert fields[1].decode() == f"\\{special_chars[i]}"
608 else:
609 assert fields[1].decode() == chr(i)
610
611
612@pytest.mark.parametrize(

Callers

nothing calls this directly

Calls 6

AsyncCopyClass · 0.90
AsyncFileWriterClass · 0.85
executeMethod · 0.45
cursorMethod · 0.45
write_rowMethod · 0.45
readMethod · 0.45

Tested by

no test coverage detected