(conn, format, buffer)
| 218 | [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")], |
| 219 | ) |
| 220 | def test_copy_in_buffers(conn, format, buffer): |
| 221 | cur = conn.cursor() |
| 222 | ensure_table(cur, sample_tabledef) |
| 223 | with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy: |
| 224 | copy.write(globals()[buffer]) |
| 225 | |
| 226 | cur.execute("select * from copy_in order by 1") |
| 227 | data = cur.fetchall() |
| 228 | assert data == sample_records |
| 229 | |
| 230 | |
| 231 | def test_copy_in_buffers_pg_error(conn): |