(self, family, host, source_ip=None, source_port=None)
| 86 | |
| 87 | @gen_test |
| 88 | def do_test_connect(self, family, host, source_ip=None, source_port=None): |
| 89 | port = self.start_server(family) |
| 90 | stream = yield self.client.connect( |
| 91 | host, |
| 92 | port, |
| 93 | source_ip=source_ip, |
| 94 | source_port=source_port, |
| 95 | af=family, |
| 96 | ) |
| 97 | assert self.server is not None |
| 98 | server_stream = yield self.server.queue.get() |
| 99 | with closing(stream): |
| 100 | stream.write(b"hello") |
| 101 | data = yield server_stream.read_bytes(5) |
| 102 | self.assertEqual(data, b"hello") |
| 103 | |
| 104 | def test_connect_ipv4_ipv4(self): |
| 105 | self.do_test_connect(socket.AF_INET, "127.0.0.1") |
no test coverage detected