(self, fd: int, readable: bool, writable: bool)
| 39 | self.fds = {} # type: Dict[int, int] |
| 40 | |
| 41 | def _sock_state_cb(self, fd: int, readable: bool, writable: bool) -> None: |
| 42 | state = (IOLoop.READ if readable else 0) | (IOLoop.WRITE if writable else 0) |
| 43 | if not state: |
| 44 | self.io_loop.remove_handler(fd) |
| 45 | del self.fds[fd] |
| 46 | elif fd in self.fds: |
| 47 | self.io_loop.update_handler(fd, state) |
| 48 | self.fds[fd] = state |
| 49 | else: |
| 50 | self.io_loop.add_handler(fd, self._handle_events, state) |
| 51 | self.fds[fd] = state |
| 52 | |
| 53 | def _handle_events(self, fd: int, events: int) -> None: |
| 54 | read_fd = pycares.ARES_SOCKET_BAD |
nothing calls this directly
no test coverage detected