(self, async_engine)
| 13 | class AsyncScopedSessionTest(AsyncFixture): |
| 14 | @async_test |
| 15 | async def test_basic(self, async_engine): |
| 16 | from asyncio import current_task |
| 17 | |
| 18 | AsyncSession = async_scoped_session( |
| 19 | sa.orm.sessionmaker(async_engine, class_=_AsyncSession), |
| 20 | scopefunc=current_task, |
| 21 | ) |
| 22 | |
| 23 | some_async_session = AsyncSession() |
| 24 | some_other_async_session = AsyncSession() |
| 25 | |
| 26 | is_(some_async_session, some_other_async_session) |
| 27 | is_(some_async_session.bind, async_engine) |
| 28 | |
| 29 | User = self.classes.User |
| 30 | |
| 31 | async with AsyncSession.begin(): |
| 32 | user_name = "scoped_async_session_u1" |
| 33 | u1 = User(name=user_name) |
| 34 | |
| 35 | AsyncSession.add(u1) |
| 36 | |
| 37 | await AsyncSession.flush() |
| 38 | |
| 39 | conn = await AsyncSession.connection() |
| 40 | |
| 41 | stmt = select(func.count(User.id)).where(User.name == user_name) |
| 42 | eq_(await AsyncSession.scalar(stmt), 1) |
| 43 | |
| 44 | await AsyncSession.delete(u1) |
| 45 | await AsyncSession.flush() |
| 46 | eq_(await conn.scalar(stmt), 0) |
| 47 | |
| 48 | def test_attributes(self, async_engine): |
| 49 | from asyncio import current_task |
nothing calls this directly
no test coverage detected