MCPcopy
hub / github.com/tornadoweb/tornado / test_acquire

Method test_acquire

tornado/test/locks_test.py:258–276  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

256 self.assertIn("waiters", repr(sem))
257
258 def test_acquire(self):
259 sem = locks.Semaphore()
260 f0 = asyncio.ensure_future(sem.acquire())
261 self.assertTrue(f0.done())
262
263 # Wait for release().
264 f1 = asyncio.ensure_future(sem.acquire())
265 self.assertFalse(f1.done())
266 f2 = asyncio.ensure_future(sem.acquire())
267 sem.release()
268 self.assertTrue(f1.done())
269 self.assertFalse(f2.done())
270 sem.release()
271 self.assertTrue(f2.done())
272
273 sem.release()
274 # Now acquire() is instant.
275 self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
276 self.assertEqual(0, len(sem._waiters))
277
278 @gen_test
279 def test_acquire_timeout(self):

Callers

nothing calls this directly

Calls 3

acquireMethod · 0.95
releaseMethod · 0.95
doneMethod · 0.45

Tested by

no test coverage detected