MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_twophase

Method test_twophase

test/orm/test_transaction.py:436–462  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

434
435 @testing.requires.two_phase_transactions
436 def test_twophase(self):
437 users, Address, addresses, User = (
438 self.tables.users,
439 self.classes.Address,
440 self.tables.addresses,
441 self.classes.User,
442 )
443
444 # TODO: mock up a failure condition here
445 # to ensure a rollback succeeds
446 self.mapper_registry.map_imperatively(User, users)
447 self.mapper_registry.map_imperatively(Address, addresses)
448
449 engine2 = engines.testing_engine()
450 sess = fixture_session(autoflush=False, twophase=True)
451 sess.bind_mapper(User, testing.db)
452 sess.bind_mapper(Address, engine2)
453 sess.begin()
454 u1 = User(name="u1")
455 a1 = Address(email_address="u1@e")
456 sess.add_all((u1, a1))
457 sess.commit()
458 sess.close()
459 engine2.dispose()
460 with testing.db.connect() as conn:
461 eq_(conn.scalar(select(func.count("*")).select_from(users)), 1)
462 eq_(conn.scalar(select(func.count("*")).select_from(addresses)), 1)
463
464 @testing.requires.independent_connections
465 def test_invalidate(self):

Callers

nothing calls this directly

Calls 15

fixture_sessionFunction · 0.90
eq_Function · 0.90
selectFunction · 0.90
map_imperativelyMethod · 0.80
testing_engineMethod · 0.80
UserClass · 0.70
AddressClass · 0.70
bind_mapperMethod · 0.45
beginMethod · 0.45
add_allMethod · 0.45
commitMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected