MCPcopy
hub / github.com/psycopg/psycopg / test_transaction_concurrency

Function test_transaction_concurrency

tests/test_concurrency.py:478–518  ·  view source on GitHub ↗
(conn, what)

Source from the content-addressed store, hash-verified

476
477@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
478def test_transaction_concurrency(conn, what):
479 conn.autocommit = True
480
481 evs = [threading.Event() for i in range(3)]
482
483 def worker(unlock, wait_on):
484 with pytest.raises(e.ProgrammingError) as ex:
485 with conn.transaction():
486 unlock.set()
487 wait_on.wait()
488 conn.execute("select 1")
489
490 if what == "error":
491 1 / 0
492 elif what == "rollback":
493 raise psycopg.Rollback()
494 else:
495 assert what == "commit"
496
497 if what == "error":
498 assert "transaction rollback" in str(ex.value)
499 assert isinstance(ex.value.__context__, ZeroDivisionError)
500 elif what == "rollback":
501 assert "transaction rollback" in str(ex.value)
502 assert isinstance(ex.value.__context__, psycopg.Rollback)
503 else:
504 assert "transaction commit" in str(ex.value)
505
506 # Start a first transaction in a thread
507 t1 = threading.Thread(target=worker, kwargs={"unlock": evs[0], "wait_on": evs[1]})
508 t1.start()
509 evs[0].wait()
510
511 # Start a nested transaction in a thread
512 t2 = threading.Thread(target=worker, kwargs={"unlock": evs[1], "wait_on": evs[2]})
513 t2.start()
514
515 # Terminate the first transaction before the second does
516 t1.join()
517 evs[2].set()
518 t2.join()
519
520
521@pytest.mark.slow

Callers

nothing calls this directly

Calls 4

startMethod · 0.45
waitMethod · 0.45
joinMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected