(self)
| 241 | |
| 242 | @gen_test |
| 243 | def test_put_timeout_preempted(self): |
| 244 | q = queues.Queue(1) # type: queues.Queue[int] |
| 245 | q.put_nowait(0) |
| 246 | put = q.put(1, timeout=timedelta(seconds=0.01)) |
| 247 | q.get() |
| 248 | yield gen.sleep(0.02) |
| 249 | yield put # No TimeoutError. |
| 250 | |
| 251 | @gen_test |
| 252 | def test_put_clears_timed_out_putters(self): |
nothing calls this directly
no test coverage detected