MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_scalars

Method test_scalars

test/ext/asyncio/test_engine.py:1446–1455  ·  view source on GitHub ↗
(self, async_engine, case)

Source from the content-addressed store, hash-verified

1444 @testing.combinations(("scalars",), ("stream_scalars",), argnames="case")
1445 @async_test
1446 async def test_scalars(self, async_engine, case):
1447 users = self.tables.users
1448 stmt = select(users).order_by(users.c.user_id)
1449 async with async_engine.connect() as conn:
1450 if case == "scalars":
1451 result = (await conn.scalars(stmt)).all()
1452 elif case == "stream_scalars":
1453 result = await (await conn.stream_scalars(stmt)).all()
1454
1455 eq_(result, list(range(1, 20)))
1456
1457 @async_test
1458 @testing.combinations(("stream",), ("stream_scalars",), argnames="case")

Callers

nothing calls this directly

Calls 7

selectFunction · 0.90
eq_Function · 0.90
order_byMethod · 0.45
connectMethod · 0.45
allMethod · 0.45
scalarsMethod · 0.45
stream_scalarsMethod · 0.45

Tested by

no test coverage detected