MCPcopy
hub / github.com/psycopg/psycopg / work

Function work

tests/test_cursor_async.py:77–98  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

75 row_factory = getattr(rows, row_factory)
76
77 async def work():
78 async with await aconn_cls.connect(dsn) as conn, conn.transaction(
79 force_rollback=True
80 ):
81 async with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
82 await cur.execute(faker.drop_stmt)
83 await cur.execute(faker.create_stmt)
84 async with faker.find_insert_problem_async(conn):
85 await cur.executemany(faker.insert_stmt, faker.records)
86 await cur.execute(faker.select_stmt)
87
88 if fetch == "one":
89 while (await cur.fetchone()) is not None:
90 pass
91 elif fetch == "many":
92 while await cur.fetchmany(3):
93 pass
94 elif fetch == "all":
95 await cur.fetchall()
96 elif fetch == "iter":
97 async for rec in cur:
98 pass
99
100 n = []
101 gc.collect()

Callers 1

test_leakFunction · 0.70

Calls 9

connectMethod · 0.45
transactionMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
executemanyMethod · 0.45
fetchoneMethod · 0.45
fetchmanyMethod · 0.45
fetchallMethod · 0.45

Tested by

no test coverage detected