Test that copy operations hold the connection lock for the entire operation. This test verifies the fix for the concurrency issue where AsyncCursor.copy() was not holding the connection lock throughout the copy context, allowing concurrent operations to interfere.
(aconn)
| 969 | |
| 970 | |
| 971 | async def test_copy_concurrency(aconn): |
| 972 | """ |
| 973 | Test that copy operations hold the connection lock for the entire operation. |
| 974 | |
| 975 | This test verifies the fix for the concurrency issue where AsyncCursor.copy() |
| 976 | was not holding the connection lock throughout the copy context, allowing |
| 977 | concurrent operations to interfere. |
| 978 | """ |
| 979 | await aconn.execute("create temp table copy_concurrency_test (id int, data text)") |
| 980 | |
| 981 | # Events to coordinate execution between copy task and workers |
| 982 | copy_entered = AEvent() |
| 983 | wrote_first = AEvent() |
| 984 | wrote_second = AEvent() |
| 985 | can_proceed = AEvent() |
| 986 | |
| 987 | # Track execution order to verify workers run after copy completes |
| 988 | execution_log = [] |
| 989 | |
| 990 | async def copy_task(): |
| 991 | """Copy task that writes two rows with controlled pauses.""" |
| 992 | cur = aconn.cursor() |
| 993 | async with cur.copy("copy copy_concurrency_test from stdin") as copy: |
| 994 | # Pause after entering copy context |
| 995 | execution_log.append("entered_copy") |
| 996 | copy_entered.set() |
| 997 | await can_proceed.wait() |
| 998 | |
| 999 | # Write first row and pause |
| 1000 | await copy.write_row((1, "first")) |
| 1001 | execution_log.append("wrote_row_1") |
| 1002 | wrote_first.set() |
| 1003 | await can_proceed.wait() |
| 1004 | |
| 1005 | # Write second row and pause |
| 1006 | await copy.write_row((2, "second")) |
| 1007 | execution_log.append("wrote_row_2") |
| 1008 | wrote_second.set() |
| 1009 | await can_proceed.wait() |
| 1010 | |
| 1011 | # Copy context exited, lock should now be released |
| 1012 | execution_log.append("exited_copy") |
| 1013 | |
| 1014 | async def worker_task(): |
| 1015 | """ |
| 1016 | Worker that attempts to execute a query on a different cursor. |
| 1017 | Should block until copy completes due to connection lock. |
| 1018 | """ |
| 1019 | # Try to execute on another cursor - this should block until copy exits |
| 1020 | worker_cur = aconn.cursor() |
| 1021 | await worker_cur.execute("select 1") |
| 1022 | execution_log.append("worker_completed") |
| 1023 | |
| 1024 | # Start the copy task |
| 1025 | t_copy = spawn(copy_task) |
| 1026 | |
| 1027 | # Wait for copy to enter, then spawn first worker |
| 1028 | await copy_entered.wait() |