| 256 | self.assertIn("waiters", repr(sem)) |
| 257 | |
| 258 | def test_acquire(self): |
| 259 | sem = locks.Semaphore() |
| 260 | f0 = asyncio.ensure_future(sem.acquire()) |
| 261 | self.assertTrue(f0.done()) |
| 262 | |
| 263 | # Wait for release(). |
| 264 | f1 = asyncio.ensure_future(sem.acquire()) |
| 265 | self.assertFalse(f1.done()) |
| 266 | f2 = asyncio.ensure_future(sem.acquire()) |
| 267 | sem.release() |
| 268 | self.assertTrue(f1.done()) |
| 269 | self.assertFalse(f2.done()) |
| 270 | sem.release() |
| 271 | self.assertTrue(f2.done()) |
| 272 | |
| 273 | sem.release() |
| 274 | # Now acquire() is instant. |
| 275 | self.assertTrue(asyncio.ensure_future(sem.acquire()).done()) |
| 276 | self.assertEqual(0, len(sem._waiters)) |
| 277 | |
| 278 | @gen_test |
| 279 | def test_acquire_timeout(self): |