(self)
| 161 | |
| 162 | @skipIfNonUnix # just because socketpair is so convenient |
| 163 | def test_read_while_writeable(self): |
| 164 | # Ensure that write events don't come in while we're waiting for |
| 165 | # a read and haven't asked for writeability. (the reverse is |
| 166 | # difficult to test for) |
| 167 | client, server = socket.socketpair() |
| 168 | try: |
| 169 | |
| 170 | def handler(fd, events): |
| 171 | self.assertEqual(events, IOLoop.READ) |
| 172 | self.stop() |
| 173 | |
| 174 | self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ) |
| 175 | self.io_loop.add_timeout( |
| 176 | self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf") |
| 177 | ) |
| 178 | self.wait() |
| 179 | self.io_loop.remove_handler(client.fileno()) |
| 180 | finally: |
| 181 | client.close() |
| 182 | server.close() |
| 183 | |
| 184 | def test_remove_timeout_after_fire(self): |
| 185 | # It is not an error to call remove_timeout after it has run. |
nothing calls this directly
no test coverage detected