(aconn_cls, dsn, faker, fetch, row_factory, gc)
| 80 | @pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"]) |
| 81 | @pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"]) |
| 82 | async def test_leak(aconn_cls, dsn, faker, fetch, row_factory, gc): |
| 83 | faker.choose_schema(ncols=5) |
| 84 | faker.make_records(10) |
| 85 | row_factory = getattr(rows, row_factory) |
| 86 | |
| 87 | async def work(): |
| 88 | async with await aconn_cls.connect(dsn) as conn, conn.transaction( |
| 89 | force_rollback=True |
| 90 | ): |
| 91 | async with psycopg.AsyncClientCursor(conn, row_factory=row_factory) as cur: |
| 92 | await cur.execute(faker.drop_stmt) |
| 93 | await cur.execute(faker.create_stmt) |
| 94 | async with faker.find_insert_problem_async(conn): |
| 95 | await cur.executemany(faker.insert_stmt, faker.records) |
| 96 | await cur.execute(faker.select_stmt) |
| 97 | |
| 98 | if fetch == "one": |
| 99 | while (await cur.fetchone()) is not None: |
| 100 | pass |
| 101 | elif fetch == "many": |
| 102 | while await cur.fetchmany(3): |
| 103 | pass |
| 104 | elif fetch == "all": |
| 105 | await cur.fetchall() |
| 106 | elif fetch == "iter": |
| 107 | async for rec in cur: |
| 108 | pass |
| 109 | |
| 110 | n = [] |
| 111 | gc.collect() |
| 112 | for i in range(3): |
| 113 | await work() |
| 114 | gc.collect() |
| 115 | n.append(gc.count()) |
| 116 | |
| 117 | assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" |
| 118 | |
| 119 | |
| 120 | @pytest.mark.parametrize( |
nothing calls this directly
no test coverage detected