(self)
| 211 | |
| 212 | @gen_test |
| 213 | def test_blocking_put_wait(self): |
| 214 | q = queues.Queue(1) # type: queues.Queue[int] |
| 215 | q.put_nowait(0) |
| 216 | |
| 217 | def get_and_discard(): |
| 218 | q.get() |
| 219 | |
| 220 | self.io_loop.call_later(0.01, get_and_discard) |
| 221 | self.io_loop.call_later(0.02, get_and_discard) |
| 222 | futures = [q.put(0), q.put(1)] |
| 223 | self.assertFalse(any(f.done() for f in futures)) |
| 224 | yield futures |
| 225 | |
| 226 | @gen_test |
| 227 | def test_put_timeout(self): |
nothing calls this directly
no test coverage detected