MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_run_async

Method test_run_async

test/ext/asyncio/test_engine.py:245–266  ·  view source on GitHub ↗
(self, async_engine)

Source from the content-addressed store, hash-verified

243
244 @async_test
245 async def test_run_async(self, async_engine):
246 async def test_meth(async_driver_connection):
247 # there's no method that's guaranteed to be on every
248 # driver, so just stringify it and compare that to the
249 # outside
250 return str(async_driver_connection)
251
252 def run_sync_to_async(connection):
253 connection_fairy = connection.connection
254 async_return = connection_fairy.run_async(
255 lambda driver_connection: test_meth(driver_connection)
256 )
257 assert not stdlib_inspect.iscoroutine(async_return)
258 return async_return
259
260 async with async_engine.connect() as conn:
261 driver_connection = (
262 await conn.get_raw_connection()
263 ).driver_connection
264 res = await conn.run_sync(run_sync_to_async)
265 assert not stdlib_inspect.iscoroutine(res)
266 eq_(res, str(driver_connection))
267
268 @async_test
269 async def test_engine_eq_ne(self, async_engine):

Callers

nothing calls this directly

Calls 4

eq_Function · 0.90
get_raw_connectionMethod · 0.80
connectMethod · 0.45
run_syncMethod · 0.45

Tested by

no test coverage detected