(self)
| 277 | |
| 278 | @gen_test |
| 279 | def test_acquire_timeout(self): |
| 280 | sem = locks.Semaphore(2) |
| 281 | yield sem.acquire() |
| 282 | yield sem.acquire() |
| 283 | acquire = sem.acquire(timedelta(seconds=0.01)) |
| 284 | self.io_loop.call_later(0.02, sem.release) # Too late. |
| 285 | yield gen.sleep(0.3) |
| 286 | with self.assertRaises(gen.TimeoutError): |
| 287 | yield acquire |
| 288 | |
| 289 | sem.acquire() |
| 290 | f = asyncio.ensure_future(sem.acquire()) |
| 291 | self.assertFalse(f.done()) |
| 292 | sem.release() |
| 293 | self.assertTrue(f.done()) |
| 294 | |
| 295 | @gen_test |
| 296 | def test_acquire_timeout_preempted(self): |
nothing calls this directly
no test coverage detected