MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_require_await

Method test_require_await

test/base/test_concurrency.py:181–191  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

179
180 @async_test
181 async def test_require_await(self):
182 def run():
183 return 1 + 1
184
185 assert (await greenlet_spawn(run)) == 2
186
187 with expect_raises_message(
188 exc.AwaitRequired,
189 "The current operation required an async execution but none was",
190 ):
191 await greenlet_spawn(run, _require_await=True)
192
193
194class TestAsyncAdaptedQueue(fixtures.TestBase):

Callers

nothing calls this directly

Calls 2

greenlet_spawnFunction · 0.90
expect_raises_messageFunction · 0.90

Tested by

no test coverage detected