MCPcopy
hub / github.com/tornadoweb/tornado / test_acquire_timeout

Method test_acquire_timeout

tornado/test/locks_test.py:279–293  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

277
278 @gen_test
279 def test_acquire_timeout(self):
280 sem = locks.Semaphore(2)
281 yield sem.acquire()
282 yield sem.acquire()
283 acquire = sem.acquire(timedelta(seconds=0.01))
284 self.io_loop.call_later(0.02, sem.release) # Too late.
285 yield gen.sleep(0.3)
286 with self.assertRaises(gen.TimeoutError):
287 yield acquire
288
289 sem.acquire()
290 f = asyncio.ensure_future(sem.acquire())
291 self.assertFalse(f.done())
292 sem.release()
293 self.assertTrue(f.done())
294
295 @gen_test
296 def test_acquire_timeout_preempted(self):

Callers

nothing calls this directly

Calls 4

acquireMethod · 0.95
releaseMethod · 0.95
call_laterMethod · 0.80
doneMethod · 0.45

Tested by

no test coverage detected