(self)
| 1168 | self.engine.dispose() |
| 1169 | |
| 1170 | def test_reconnect(self): |
| 1171 | with self.engine.connect() as conn: |
| 1172 | eq_(conn.execute(select(1)).scalar(), 1) |
| 1173 | assert not conn.closed |
| 1174 | |
| 1175 | self.engine.test_shutdown() |
| 1176 | |
| 1177 | _assert_invalidated(conn.execute, select(1)) |
| 1178 | |
| 1179 | assert not conn.closed |
| 1180 | assert conn.invalidated |
| 1181 | |
| 1182 | assert conn.invalidated |
| 1183 | |
| 1184 | with expect_raises(tsa.exc.PendingRollbackError): |
| 1185 | conn.execute(select(1)) |
| 1186 | |
| 1187 | conn.rollback() |
| 1188 | |
| 1189 | eq_(conn.execute(select(1)).scalar(), 1) |
| 1190 | assert not conn.invalidated |
| 1191 | |
| 1192 | # one more time |
| 1193 | self.engine.test_shutdown() |
| 1194 | _assert_invalidated(conn.execute, select(1)) |
| 1195 | |
| 1196 | assert conn.invalidated |
| 1197 | conn.rollback() |
| 1198 | |
| 1199 | eq_(conn.execute(select(1)).scalar(), 1) |
| 1200 | assert not conn.invalidated |
| 1201 | |
| 1202 | def test_detach_invalidated(self): |
| 1203 | with self.engine.connect() as conn: |
nothing calls this directly
no test coverage detected