MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / thread_go

Method thread_go

test/base/test_concurrency.py:200–206  ·  view source on GitHub ↗
(q)

Source from the content-addressed store, hash-verified

198 run = [False]
199
200 def thread_go(q):
201 def go():
202 q.get(timeout=0.1)
203
204 with expect_raises(queue.Empty):
205 asyncio.run(greenlet_spawn(go))
206 run[0] = True
207
208 t = threading.Thread(
209 target=thread_go, args=[queue.AsyncAdaptedQueue()]

Callers

nothing calls this directly

Calls 4

expect_raisesFunction · 0.90
greenlet_spawnFunction · 0.90
expect_raises_messageFunction · 0.90
runMethod · 0.45

Tested by

no test coverage detected