MCPcopy
hub / github.com/psycopg/psycopg / main_async

Function main_async

tests/scripts/copytest.py:49–61  ·  view source on GitHub ↗
(args: Namespace)

Source from the content-addressed store, hash-verified

47
48
49async def main_async(args: Namespace) -> None:
50 test = CopyPutTest(args)
51 async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
52 async with conn.cursor() as cur:
53 await cur.execute(test.get_table_stmt())
54 writer = getattr(psycopg.copy, args.writer)(cur) if args.writer else None
55 t0 = time()
56 async with cur.copy(test.get_copy_stmt(), writer=writer) as copy:
57 for i in range(args.nrecs):
58 await copy.write_row(test.get_record())
59 tf = time()
60
61 logger.info("time to copy: %.6f sec", tf - t0)
62
63
64class CopyPutTest:

Callers 1

mainFunction · 0.85

Calls 10

get_table_stmtMethod · 0.95
get_copy_stmtMethod · 0.95
get_recordMethod · 0.95
CopyPutTestClass · 0.85
connectMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
copyMethod · 0.45
write_rowMethod · 0.45
infoMethod · 0.45

Tested by

no test coverage detected