(conn_cls, dsn, faker, mode)
| 919 | @pytest.mark.slow |
| 920 | @pytest.mark.parametrize("mode", ["row", "block", "binary"]) |
| 921 | def test_copy_table_across(conn_cls, dsn, faker, mode): |
| 922 | faker.choose_schema(ncols=20) |
| 923 | faker.make_records(20) |
| 924 | |
| 925 | connect = conn_cls.connect |
| 926 | with connect(dsn) as conn1, connect(dsn) as conn2: |
| 927 | faker.table_name = sql.Identifier("copy_src") |
| 928 | conn1.execute(faker.drop_stmt) |
| 929 | conn1.execute(faker.create_stmt) |
| 930 | conn1.cursor().executemany(faker.insert_stmt, faker.records) |
| 931 | |
| 932 | faker.table_name = sql.Identifier("copy_tgt") |
| 933 | conn2.execute(faker.drop_stmt) |
| 934 | conn2.execute(faker.create_stmt) |
| 935 | |
| 936 | fmt = "(format binary)" if mode == "binary" else "" |
| 937 | with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1: |
| 938 | with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2: |
| 939 | if mode == "row": |
| 940 | for row in copy1.rows(): |
| 941 | copy2.write_row(row) |
| 942 | else: |
| 943 | for data in copy1: |
| 944 | copy2.write(data) |
| 945 | |
| 946 | cur = conn2.execute(faker.select_stmt) |
| 947 | recs = cur.fetchall() |
| 948 | for got, want in zip(recs, faker.records): |
| 949 | faker.assert_record(got, want) |
| 950 | |
| 951 | |
| 952 | def test_copy_concurrency(conn): |
nothing calls this directly
no test coverage detected