(self, method, path, handler)
| 721 | |
| 722 | @asyncio.coroutine |
| 723 | def create_unix_server(self, method, path, handler): |
| 724 | tmpdir = tempfile.mkdtemp() |
| 725 | self.addCleanup(shutil.rmtree, tmpdir) |
| 726 | app = web.Application(loop=self.loop) |
| 727 | app.router.add_route(method, path, handler) |
| 728 | |
| 729 | self.handler = app.make_handler(keep_alive_on=False, access_log=None) |
| 730 | sock_path = os.path.join(tmpdir, &class="cm">#x27;socket.sock') |
| 731 | srv = yield from self.loop.create_unix_server( |
| 732 | self.handler, sock_path) |
| 733 | url = class="st">"http://127.0.0.1" + path |
| 734 | self.addCleanup(srv.close) |
| 735 | return app, srv, url, sock_path |
| 736 | |
| 737 | def test_tcp_connector(self): |
| 738 | @asyncio.coroutine |
no test coverage detected