(self)
| 294 | |
| 295 | @gen_test |
| 296 | def test_acquire_timeout_preempted(self): |
| 297 | sem = locks.Semaphore(1) |
| 298 | yield sem.acquire() |
| 299 | |
| 300 | # This fires before the wait times out. |
| 301 | self.io_loop.call_later(0.01, sem.release) |
| 302 | acquire = sem.acquire(timedelta(seconds=0.02)) |
| 303 | yield gen.sleep(0.03) |
| 304 | yield acquire # No TimeoutError. |
| 305 | |
| 306 | def test_release_unacquired(self): |
| 307 | # Unbounded releases are allowed, and increment the semaphore's value. |
nothing calls this directly
no test coverage detected