(aconn_cls, dsn, faker, fmt, set_types, gc)
| 202 | ) |
| 203 | @pytest.mark.crdb_skip("copy array") |
| 204 | async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc): |
| 205 | faker.format = PyFormat.from_pq(fmt) |
| 206 | faker.choose_schema(ncols=20) |
| 207 | faker.make_records(20) |
| 208 | |
| 209 | async def work(): |
| 210 | async with await aconn_cls.connect(dsn) as conn: |
| 211 | async with conn.cursor(binary=fmt) as cur: |
| 212 | await cur.execute(faker.drop_stmt) |
| 213 | await cur.execute(faker.create_stmt) |
| 214 | |
| 215 | stmt = sql.SQL("copy {} ({}) from stdin {}").format( |
| 216 | faker.table_name, |
| 217 | sql.SQL(", ").join(faker.fields_names), |
| 218 | sql.SQL("with binary" if fmt else ""), |
| 219 | ) |
| 220 | async with cur.copy(stmt) as copy: |
| 221 | if set_types: |
| 222 | copy.set_types(faker.types_names) |
| 223 | for row in faker.records: |
| 224 | await copy.write_row(row) |
| 225 | |
| 226 | await cur.execute(faker.select_stmt) |
| 227 | recs = await cur.fetchall() |
| 228 | |
| 229 | for got, want in zip(recs, faker.records): |
| 230 | faker.assert_record(got, want) |
| 231 | |
| 232 | gc.collect() |
| 233 | n = [] |
| 234 | for i in range(3): |
| 235 | await work() |
| 236 | gc.collect() |
| 237 | n.append(gc.count()) |
| 238 | |
| 239 | assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" |
| 240 | |
| 241 | |
| 242 | def copyopt(format): |
nothing calls this directly
no test coverage detected