(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc)
| 69 | @pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"]) |
| 70 | @pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"]) |
| 71 | async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc): |
| 72 | faker.format = fmt |
| 73 | faker.choose_schema(ncols=5) |
| 74 | faker.make_records(10) |
| 75 | row_factory = getattr(rows, row_factory) |
| 76 | |
| 77 | async def work(): |
| 78 | async with await aconn_cls.connect(dsn) as conn, conn.transaction( |
| 79 | force_rollback=True |
| 80 | ): |
| 81 | async with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur: |
| 82 | await cur.execute(faker.drop_stmt) |
| 83 | await cur.execute(faker.create_stmt) |
| 84 | async with faker.find_insert_problem_async(conn): |
| 85 | await cur.executemany(faker.insert_stmt, faker.records) |
| 86 | await cur.execute(faker.select_stmt) |
| 87 | |
| 88 | if fetch == "one": |
| 89 | while (await cur.fetchone()) is not None: |
| 90 | pass |
| 91 | elif fetch == "many": |
| 92 | while await cur.fetchmany(3): |
| 93 | pass |
| 94 | elif fetch == "all": |
| 95 | await cur.fetchall() |
| 96 | elif fetch == "iter": |
| 97 | async for rec in cur: |
| 98 | pass |
| 99 | |
| 100 | n = [] |
| 101 | gc.collect() |
| 102 | for i in range(3): |
| 103 | await work() |
| 104 | gc.collect() |
| 105 | n.append(gc.count()) |
| 106 | |
| 107 | assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" |
nothing calls this directly
no test coverage detected