Test that copy operations hold the connection lock for the entire operation. This test verifies the fix for the concurrency issue where Cursor.copy() was not holding the connection lock throughout the copy context, allowing concurrent operations to interfere.
(conn)
| 950 | |
| 951 | |
| 952 | def test_copy_concurrency(conn): |
| 953 | """ |
| 954 | Test that copy operations hold the connection lock for the entire operation. |
| 955 | |
| 956 | This test verifies the fix for the concurrency issue where Cursor.copy() |
| 957 | was not holding the connection lock throughout the copy context, allowing |
| 958 | concurrent operations to interfere. |
| 959 | """ |
| 960 | conn.execute("create temp table copy_concurrency_test (id int, data text)") |
| 961 | |
| 962 | # Events to coordinate execution between copy task and workers |
| 963 | copy_entered = Event() |
| 964 | wrote_first = Event() |
| 965 | wrote_second = Event() |
| 966 | can_proceed = Event() |
| 967 | |
| 968 | # Track execution order to verify workers run after copy completes |
| 969 | execution_log = [] |
| 970 | |
| 971 | def copy_task(): |
| 972 | """Copy task that writes two rows with controlled pauses.""" |
| 973 | cur = conn.cursor() |
| 974 | with cur.copy("copy copy_concurrency_test from stdin") as copy: |
| 975 | # Pause after entering copy context |
| 976 | execution_log.append("entered_copy") |
| 977 | copy_entered.set() |
| 978 | can_proceed.wait() |
| 979 | |
| 980 | # Write first row and pause |
| 981 | copy.write_row((1, "first")) |
| 982 | execution_log.append("wrote_row_1") |
| 983 | wrote_first.set() |
| 984 | can_proceed.wait() |
| 985 | |
| 986 | # Write second row and pause |
| 987 | copy.write_row((2, "second")) |
| 988 | execution_log.append("wrote_row_2") |
| 989 | wrote_second.set() |
| 990 | can_proceed.wait() |
| 991 | |
| 992 | # Copy context exited, lock should now be released |
| 993 | execution_log.append("exited_copy") |
| 994 | |
| 995 | def worker_task(): |
| 996 | """ |
| 997 | Worker that attempts to execute a query on a different cursor. |
| 998 | Should block until copy completes due to connection lock. |
| 999 | """ |
| 1000 | # Try to execute on another cursor - this should block until copy exits |
| 1001 | worker_cur = conn.cursor() |
| 1002 | worker_cur.execute("select 1") |
| 1003 | execution_log.append("worker_completed") |
| 1004 | |
| 1005 | # Start the copy task |
| 1006 | t_copy = spawn(copy_task) |
| 1007 | |
| 1008 | # Wait for copy to enter, then spawn first worker |
| 1009 | copy_entered.wait() |