(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc)
| 70 | @pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"]) |
| 71 | @pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"]) |
| 72 | def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc): |
| 73 | faker.format = fmt |
| 74 | faker.choose_schema(ncols=5) |
| 75 | faker.make_records(10) |
| 76 | row_factory = getattr(rows, row_factory) |
| 77 | |
| 78 | def work(): |
| 79 | with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True): |
| 80 | with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur: |
| 81 | cur.execute(faker.drop_stmt) |
| 82 | cur.execute(faker.create_stmt) |
| 83 | with faker.find_insert_problem(conn): |
| 84 | cur.executemany(faker.insert_stmt, faker.records) |
| 85 | cur.execute(faker.select_stmt) |
| 86 | |
| 87 | if fetch == "one": |
| 88 | while cur.fetchone() is not None: |
| 89 | pass |
| 90 | elif fetch == "many": |
| 91 | while cur.fetchmany(3): |
| 92 | pass |
| 93 | elif fetch == "all": |
| 94 | cur.fetchall() |
| 95 | elif fetch == "iter": |
| 96 | for rec in cur: |
| 97 | pass |
| 98 | |
| 99 | n = [] |
| 100 | gc.collect() |
| 101 | for i in range(3): |
| 102 | work() |
| 103 | gc.collect() |
| 104 | n.append(gc.count()) |
| 105 | |
| 106 | assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" |
nothing calls this directly
no test coverage detected