MCPcopy
hub / github.com/psycopg/psycopg / test_leak

Function test_leak

tests/test_cursor_client_async.py:82–117  ·  view source on GitHub ↗
(aconn_cls, dsn, faker, fetch, row_factory, gc)

Source from the content-addressed store, hash-verified

80@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
81@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
82async def test_leak(aconn_cls, dsn, faker, fetch, row_factory, gc):
83 faker.choose_schema(ncols=5)
84 faker.make_records(10)
85 row_factory = getattr(rows, row_factory)
86
87 async def work():
88 async with await aconn_cls.connect(dsn) as conn, conn.transaction(
89 force_rollback=True
90 ):
91 async with psycopg.AsyncClientCursor(conn, row_factory=row_factory) as cur:
92 await cur.execute(faker.drop_stmt)
93 await cur.execute(faker.create_stmt)
94 async with faker.find_insert_problem_async(conn):
95 await cur.executemany(faker.insert_stmt, faker.records)
96 await cur.execute(faker.select_stmt)
97
98 if fetch == "one":
99 while (await cur.fetchone()) is not None:
100 pass
101 elif fetch == "many":
102 while await cur.fetchmany(3):
103 pass
104 elif fetch == "all":
105 await cur.fetchall()
106 elif fetch == "iter":
107 async for rec in cur:
108 pass
109
110 n = []
111 gc.collect()
112 for i in range(3):
113 await work()
114 gc.collect()
115 n.append(gc.count())
116
117 assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
118
119
120@pytest.mark.parametrize(

Callers

nothing calls this directly

Calls 5

choose_schemaMethod · 0.80
make_recordsMethod · 0.80
collectMethod · 0.80
countMethod · 0.80
workFunction · 0.70

Tested by

no test coverage detected