MCPcopy
hub / github.com/psycopg/psycopg / test_copy_concurrency

Function test_copy_concurrency

tests/test_copy.py:952–1048  ·  view source on GitHub ↗

Test that copy operations hold the connection lock for the entire operation. This test verifies the fix for the concurrency issue where Cursor.copy() was not holding the connection lock throughout the copy context, allowing concurrent operations to interfere.

(conn)

Source from the content-addressed store, hash-verified

950
951
952def test_copy_concurrency(conn):
953 """
954 Test that copy operations hold the connection lock for the entire operation.
955
956 This test verifies the fix for the concurrency issue where Cursor.copy()
957 was not holding the connection lock throughout the copy context, allowing
958 concurrent operations to interfere.
959 """
960 conn.execute("create temp table copy_concurrency_test (id int, data text)")
961
962 # Events to coordinate execution between copy task and workers
963 copy_entered = Event()
964 wrote_first = Event()
965 wrote_second = Event()
966 can_proceed = Event()
967
968 # Track execution order to verify workers run after copy completes
969 execution_log = []
970
971 def copy_task():
972 """Copy task that writes two rows with controlled pauses."""
973 cur = conn.cursor()
974 with cur.copy("copy copy_concurrency_test from stdin") as copy:
975 # Pause after entering copy context
976 execution_log.append("entered_copy")
977 copy_entered.set()
978 can_proceed.wait()
979
980 # Write first row and pause
981 copy.write_row((1, "first"))
982 execution_log.append("wrote_row_1")
983 wrote_first.set()
984 can_proceed.wait()
985
986 # Write second row and pause
987 copy.write_row((2, "second"))
988 execution_log.append("wrote_row_2")
989 wrote_second.set()
990 can_proceed.wait()
991
992 # Copy context exited, lock should now be released
993 execution_log.append("exited_copy")
994
995 def worker_task():
996 """
997 Worker that attempts to execute a query on a different cursor.
998 Should block until copy completes due to connection lock.
999 """
1000 # Try to execute on another cursor - this should block until copy exits
1001 worker_cur = conn.cursor()
1002 worker_cur.execute("select 1")
1003 execution_log.append("worker_completed")
1004
1005 # Start the copy task
1006 t_copy = spawn(copy_task)
1007
1008 # Wait for copy to enter, then spawn first worker
1009 copy_entered.wait()

Callers

nothing calls this directly

Calls 7

spawnFunction · 0.70
gatherFunction · 0.70
executeMethod · 0.45
waitMethod · 0.45
setMethod · 0.45
clearMethod · 0.45
fetchallMethod · 0.45

Tested by

no test coverage detected