(aconn_cls, dsn, faker, fmt, set_types, gc)
| 898 | [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)], |
| 899 | ) |
| 900 | async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc): |
| 901 | faker.format = PyFormat.from_pq(fmt) |
| 902 | faker.choose_schema(ncols=20) |
| 903 | faker.make_records(20) |
| 904 | |
| 905 | async def work(): |
| 906 | async with await aconn_cls.connect(dsn) as conn: |
| 907 | async with conn.cursor(binary=fmt) as cur: |
| 908 | await cur.execute(faker.drop_stmt) |
| 909 | await cur.execute(faker.create_stmt) |
| 910 | |
| 911 | stmt = sql.SQL("copy {} ({}) from stdin (format {})").format( |
| 912 | faker.table_name, |
| 913 | sql.SQL(", ").join(faker.fields_names), |
| 914 | sql.SQL(fmt.name), |
| 915 | ) |
| 916 | async with cur.copy(stmt) as copy: |
| 917 | if set_types: |
| 918 | copy.set_types(faker.types_names) |
| 919 | for row in faker.records: |
| 920 | await copy.write_row(row) |
| 921 | |
| 922 | await cur.execute(faker.select_stmt) |
| 923 | recs = await cur.fetchall() |
| 924 | |
| 925 | for got, want in zip(recs, faker.records): |
| 926 | faker.assert_record(got, want) |
| 927 | |
| 928 | gc.collect() |
| 929 | n = [] |
| 930 | for i in range(3): |
| 931 | await work() |
| 932 | gc.collect() |
| 933 | n.append(gc.count()) |
| 934 | |
| 935 | assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" |
| 936 | |
| 937 | |
| 938 | @pytest.mark.slow |
nothing calls this directly
no test coverage detected