(self)
| 533 | |
| 534 | @engines.close_open_connections |
| 535 | def test_bound_connection(self): |
| 536 | users, User = self.tables.users, self.classes.User |
| 537 | |
| 538 | self.mapper_registry.map_imperatively(User, users) |
| 539 | c = testing.db.connect() |
| 540 | sess = Session(bind=c) |
| 541 | sess.begin() |
| 542 | transaction = sess.get_transaction() |
| 543 | u = User(name="u1") |
| 544 | sess.add(u) |
| 545 | sess.flush() |
| 546 | assert ( |
| 547 | transaction._connection_for_bind(testing.db, None) |
| 548 | is transaction._connection_for_bind(c, None) |
| 549 | is c |
| 550 | ) |
| 551 | |
| 552 | assert_raises_message( |
| 553 | sa.exc.InvalidRequestError, |
| 554 | "Session already has a Connection associated", |
| 555 | transaction._connection_for_bind, |
| 556 | testing.db.connect(), |
| 557 | None, |
| 558 | ) |
| 559 | transaction.rollback() |
| 560 | assert len(sess.query(User).all()) == 0 |
| 561 | sess.close() |
| 562 | |
| 563 | def test_bound_connection_transactional(self): |
| 564 | User, users = self.classes.User, self.tables.users |
nothing calls this directly
no test coverage detected