MCPcopy
hub / github.com/psycopg/psycopg / test_leak

Function test_leak

tests/test_cursor_async.py:71–107  ·  view source on GitHub ↗
(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc)

Source from the content-addressed store, hash-verified

69@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
70@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
71async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
72 faker.format = fmt
73 faker.choose_schema(ncols=5)
74 faker.make_records(10)
75 row_factory = getattr(rows, row_factory)
76
77 async def work():
78 async with await aconn_cls.connect(dsn) as conn, conn.transaction(
79 force_rollback=True
80 ):
81 async with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
82 await cur.execute(faker.drop_stmt)
83 await cur.execute(faker.create_stmt)
84 async with faker.find_insert_problem_async(conn):
85 await cur.executemany(faker.insert_stmt, faker.records)
86 await cur.execute(faker.select_stmt)
87
88 if fetch == "one":
89 while (await cur.fetchone()) is not None:
90 pass
91 elif fetch == "many":
92 while await cur.fetchmany(3):
93 pass
94 elif fetch == "all":
95 await cur.fetchall()
96 elif fetch == "iter":
97 async for rec in cur:
98 pass
99
100 n = []
101 gc.collect()
102 for i in range(3):
103 await work()
104 gc.collect()
105 n.append(gc.count())
106
107 assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"

Callers

nothing calls this directly

Calls 5

choose_schemaMethod · 0.80
make_recordsMethod · 0.80
collectMethod · 0.80
countMethod · 0.80
workFunction · 0.70

Tested by

no test coverage detected