(self)
| 99 | |
| 100 | @gen_test |
| 101 | def test_blocking_get_wait(self): |
| 102 | q = queues.Queue() # type: queues.Queue[int] |
| 103 | q.put(0) |
| 104 | self.io_loop.call_later(0.01, q.put_nowait, 1) |
| 105 | self.io_loop.call_later(0.02, q.put_nowait, 2) |
| 106 | self.assertEqual(0, (yield q.get(timeout=timedelta(seconds=1)))) |
| 107 | self.assertEqual(1, (yield q.get(timeout=timedelta(seconds=1)))) |
| 108 | |
| 109 | @gen_test |
| 110 | def test_get_timeout(self): |
nothing calls this directly
no test coverage detected