(
future: Future, timeout: Union[None, float, datetime.timedelta]
)
| 57 | |
| 58 | |
| 59 | def _set_timeout( |
| 60 | future: Future, timeout: Union[None, float, datetime.timedelta] |
| 61 | ) -> None: |
| 62 | if timeout: |
| 63 | |
| 64 | def on_timeout() -> None: |
| 65 | if not future.done(): |
| 66 | future.set_exception(gen.TimeoutError()) |
| 67 | |
| 68 | io_loop = ioloop.IOLoop.current() |
| 69 | timeout_handle = io_loop.add_timeout(timeout, on_timeout) |
| 70 | future.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle)) |
| 71 | |
| 72 | |
| 73 | class _QueueIterator(Generic[_T]): |
no test coverage detected