MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_basic

Method test_basic

test/ext/asyncio/test_scoping.py:15–46  ·  view source on GitHub ↗
(self, async_engine)

Source from the content-addressed store, hash-verified

13class AsyncScopedSessionTest(AsyncFixture):
14 @async_test
15 async def test_basic(self, async_engine):
16 from asyncio import current_task
17
18 AsyncSession = async_scoped_session(
19 sa.orm.sessionmaker(async_engine, class_=_AsyncSession),
20 scopefunc=current_task,
21 )
22
23 some_async_session = AsyncSession()
24 some_other_async_session = AsyncSession()
25
26 is_(some_async_session, some_other_async_session)
27 is_(some_async_session.bind, async_engine)
28
29 User = self.classes.User
30
31 async with AsyncSession.begin():
32 user_name = "scoped_async_session_u1"
33 u1 = User(name=user_name)
34
35 AsyncSession.add(u1)
36
37 await AsyncSession.flush()
38
39 conn = await AsyncSession.connection()
40
41 stmt = select(func.count(User.id)).where(User.name == user_name)
42 eq_(await AsyncSession.scalar(stmt), 1)
43
44 await AsyncSession.delete(u1)
45 await AsyncSession.flush()
46 eq_(await conn.scalar(stmt), 0)
47
48 def test_attributes(self, async_engine):
49 from asyncio import current_task

Callers

nothing calls this directly

Calls 14

is_Function · 0.90
selectFunction · 0.90
eq_Function · 0.90
AsyncSessionClass · 0.85
UserClass · 0.50
beginMethod · 0.45
addMethod · 0.45
flushMethod · 0.45
connectionMethod · 0.45
whereMethod · 0.45
countMethod · 0.45

Tested by

no test coverage detected