(self)
| 463 | |
| 464 | @testing.requires.independent_connections |
| 465 | def test_invalidate(self): |
| 466 | User, users = self.classes.User, self.tables.users |
| 467 | self.mapper_registry.map_imperatively(User, users) |
| 468 | sess = fixture_session() |
| 469 | u = User(name="u1") |
| 470 | sess.add(u) |
| 471 | sess.flush() |
| 472 | c1 = sess.connection(bind_arguments={"mapper": User}) |
| 473 | dbapi_conn = c1.connection |
| 474 | assert dbapi_conn.is_valid |
| 475 | |
| 476 | sess.invalidate() |
| 477 | |
| 478 | # Connection object is closed |
| 479 | assert c1.closed |
| 480 | |
| 481 | # "invalidated" is not part of "closed" state |
| 482 | assert not c1.invalidated |
| 483 | |
| 484 | # but the DBAPI conn (really ConnectionFairy) |
| 485 | # is invalidated |
| 486 | assert not dbapi_conn.is_valid |
| 487 | |
| 488 | eq_(sess.query(User).all(), []) |
| 489 | c2 = sess.connection(bind_arguments={"mapper": User}) |
| 490 | assert not c2.invalidated |
| 491 | assert c2.connection.is_valid |
| 492 | |
| 493 | @testing.requires.savepoints |
| 494 | def test_nested_transaction(self): |
nothing calls this directly
no test coverage detected