(self)
| 844 | |
| 845 | @gen_test |
| 846 | def test_unix_socket(self): |
| 847 | with closing(IOStream(socket.socket(socket.AF_UNIX))) as stream: |
| 848 | stream.connect(self.address) |
| 849 | stream.write(b"GET /hello HTTP/1.0\r\n\r\n") |
| 850 | response = yield stream.read_until(b"\r\n") |
| 851 | self.assertEqual(response, b"HTTP/1.1 200 OK\r\n") |
| 852 | header_data = yield stream.read_until(b"\r\n\r\n") |
| 853 | headers = HTTPHeaders.parse(header_data.decode("latin1")) |
| 854 | body = yield stream.read_bytes(int(headers["Content-Length"])) |
| 855 | self.assertEqual(body, b"Hello world") |
| 856 | |
| 857 | @gen_test |
| 858 | def test_unix_socket_bad_request(self): |
nothing calls this directly
no test coverage detected