MCPcopy
hub / github.com/psycopg/psycopg / test_copy_table_across

Function test_copy_table_across

tests/test_copy_async.py:940–968  ·  view source on GitHub ↗
(aconn_cls, dsn, faker, mode)

Source from the content-addressed store, hash-verified

938@pytest.mark.slow
939@pytest.mark.parametrize("mode", ["row", "block", "binary"])
940async def test_copy_table_across(aconn_cls, dsn, faker, mode):
941 faker.choose_schema(ncols=20)
942 faker.make_records(20)
943
944 connect = aconn_cls.connect
945 async with await connect(dsn) as conn1, await connect(dsn) as conn2:
946 faker.table_name = sql.Identifier("copy_src")
947 await conn1.execute(faker.drop_stmt)
948 await conn1.execute(faker.create_stmt)
949 await conn1.cursor().executemany(faker.insert_stmt, faker.records)
950
951 faker.table_name = sql.Identifier("copy_tgt")
952 await conn2.execute(faker.drop_stmt)
953 await conn2.execute(faker.create_stmt)
954
955 fmt = "(format binary)" if mode == "binary" else ""
956 async with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1:
957 async with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2:
958 if mode == "row":
959 async for row in copy1.rows():
960 await copy2.write_row(row)
961 else:
962 async for data in copy1:
963 await copy2.write(data)
964
965 cur = await conn2.execute(faker.select_stmt)
966 recs = await cur.fetchall()
967 for got, want in zip(recs, faker.records):
968 faker.assert_record(got, want)
969
970
971async def test_copy_concurrency(aconn):

Callers

nothing calls this directly

Calls 11

choose_schemaMethod · 0.80
make_recordsMethod · 0.80
assert_recordMethod · 0.80
executeMethod · 0.45
executemanyMethod · 0.45
cursorMethod · 0.45
copyMethod · 0.45
rowsMethod · 0.45
write_rowMethod · 0.45
writeMethod · 0.45
fetchallMethod · 0.45

Tested by

no test coverage detected