test new usecase close() added along with #7274
(self)
| 5343 | pass |
| 5344 | |
| 5345 | def test_we_can_close_cursor(self): |
| 5346 | """test new usecase close() added along with #7274""" |
| 5347 | self._eagerload_mappings() |
| 5348 | |
| 5349 | User = self.classes.User |
| 5350 | |
| 5351 | sess = fixture_session() |
| 5352 | |
| 5353 | stmt = select(User).execution_options(yield_per=15) |
| 5354 | result = sess.execute(stmt) |
| 5355 | |
| 5356 | with mock.patch.object(result.raw, "_soft_close") as mock_close: |
| 5357 | two_results = result.fetchmany(2) |
| 5358 | eq_(len(two_results), 2) |
| 5359 | |
| 5360 | eq_(mock_close.mock_calls, []) |
| 5361 | |
| 5362 | result.close() |
| 5363 | |
| 5364 | eq_(mock_close.mock_calls, [mock.call(hard=True)]) |
| 5365 | |
| 5366 | with expect_raises(sa.exc.ResourceClosedError): |
| 5367 | result.fetchmany(10) |
| 5368 | |
| 5369 | with expect_raises(sa.exc.ResourceClosedError): |
| 5370 | result.fetchone() |
| 5371 | |
| 5372 | with expect_raises(sa.exc.ResourceClosedError): |
| 5373 | result.all() |
| 5374 | |
| 5375 | result.close() |
| 5376 | |
| 5377 | @testing.combinations("fetchmany", "fetchone", "fetchall") |
| 5378 | def test_cursor_is_closed_on_exhausted(self, fetch_method): |
nothing calls this directly
no test coverage detected