MCPcopy
hub / github.com/psycopg/psycopg / test_copy_concurrency

Function test_copy_concurrency

tests/test_copy_async.py:971–1067  ·  view source on GitHub ↗

Test that copy operations hold the connection lock for the entire operation. This test verifies the fix for the concurrency issue where AsyncCursor.copy() was not holding the connection lock throughout the copy context, allowing concurrent operations to interfere.

(aconn)

Source from the content-addressed store, hash-verified

969
970
971async def test_copy_concurrency(aconn):
972 """
973 Test that copy operations hold the connection lock for the entire operation.
974
975 This test verifies the fix for the concurrency issue where AsyncCursor.copy()
976 was not holding the connection lock throughout the copy context, allowing
977 concurrent operations to interfere.
978 """
979 await aconn.execute("create temp table copy_concurrency_test (id int, data text)")
980
981 # Events to coordinate execution between copy task and workers
982 copy_entered = AEvent()
983 wrote_first = AEvent()
984 wrote_second = AEvent()
985 can_proceed = AEvent()
986
987 # Track execution order to verify workers run after copy completes
988 execution_log = []
989
990 async def copy_task():
991 """Copy task that writes two rows with controlled pauses."""
992 cur = aconn.cursor()
993 async with cur.copy("copy copy_concurrency_test from stdin") as copy:
994 # Pause after entering copy context
995 execution_log.append("entered_copy")
996 copy_entered.set()
997 await can_proceed.wait()
998
999 # Write first row and pause
1000 await copy.write_row((1, "first"))
1001 execution_log.append("wrote_row_1")
1002 wrote_first.set()
1003 await can_proceed.wait()
1004
1005 # Write second row and pause
1006 await copy.write_row((2, "second"))
1007 execution_log.append("wrote_row_2")
1008 wrote_second.set()
1009 await can_proceed.wait()
1010
1011 # Copy context exited, lock should now be released
1012 execution_log.append("exited_copy")
1013
1014 async def worker_task():
1015 """
1016 Worker that attempts to execute a query on a different cursor.
1017 Should block until copy completes due to connection lock.
1018 """
1019 # Try to execute on another cursor - this should block until copy exits
1020 worker_cur = aconn.cursor()
1021 await worker_cur.execute("select 1")
1022 execution_log.append("worker_completed")
1023
1024 # Start the copy task
1025 t_copy = spawn(copy_task)
1026
1027 # Wait for copy to enter, then spawn first worker
1028 await copy_entered.wait()

Callers

nothing calls this directly

Calls 8

AEventClass · 0.70
spawnFunction · 0.70
gatherFunction · 0.70
executeMethod · 0.45
waitMethod · 0.45
setMethod · 0.45
clearMethod · 0.45
fetchallMethod · 0.45

Tested by

no test coverage detected