| 476 | |
| 477 | @pytest.mark.parametrize("what", ["commit", "rollback", "error"]) |
| 478 | def test_transaction_concurrency(conn, what): |
| 479 | conn.autocommit = True |
| 480 | |
| 481 | evs = [threading.Event() for i in range(3)] |
| 482 | |
| 483 | def worker(unlock, wait_on): |
| 484 | with pytest.raises(e.ProgrammingError) as ex: |
| 485 | with conn.transaction(): |
| 486 | unlock.set() |
| 487 | wait_on.wait() |
| 488 | conn.execute("select 1") |
| 489 | |
| 490 | if what == "error": |
| 491 | 1 / 0 |
| 492 | elif what == "rollback": |
| 493 | raise psycopg.Rollback() |
| 494 | else: |
| 495 | assert what == "commit" |
| 496 | |
| 497 | if what == "error": |
| 498 | assert "transaction rollback" in str(ex.value) |
| 499 | assert isinstance(ex.value.__context__, ZeroDivisionError) |
| 500 | elif what == "rollback": |
| 501 | assert "transaction rollback" in str(ex.value) |
| 502 | assert isinstance(ex.value.__context__, psycopg.Rollback) |
| 503 | else: |
| 504 | assert "transaction commit" in str(ex.value) |
| 505 | |
| 506 | # Start a first transaction in a thread |
| 507 | t1 = threading.Thread(target=worker, kwargs={"unlock": evs[0], "wait_on": evs[1]}) |
| 508 | t1.start() |
| 509 | evs[0].wait() |
| 510 | |
| 511 | # Start a nested transaction in a thread |
| 512 | t2 = threading.Thread(target=worker, kwargs={"unlock": evs[1], "wait_on": evs[2]}) |
| 513 | t2.start() |
| 514 | |
| 515 | # Terminate the first transaction before the second does |
| 516 | t1.join() |
| 517 | evs[2].set() |
| 518 | t2.join() |
| 519 | |
| 520 | |
| 521 | @pytest.mark.slow |