(self)
| 140 | |
| 141 | @gen_test |
| 142 | def test_subprocess(self): |
| 143 | subproc = Subprocess( |
| 144 | [sys.executable, "-u", "-i", "-I"], |
| 145 | stdin=Subprocess.STREAM, |
| 146 | stdout=Subprocess.STREAM, |
| 147 | stderr=subprocess.STDOUT, |
| 148 | ) |
| 149 | self.addCleanup(lambda: self.term_and_wait(subproc)) |
| 150 | self.addCleanup(subproc.stdout.close) |
| 151 | self.addCleanup(subproc.stdin.close) |
| 152 | yield subproc.stdout.read_until(b">>> ") |
| 153 | subproc.stdin.write(b"print('hello')\n") |
| 154 | data = yield subproc.stdout.read_until(b"\n") |
| 155 | self.assertEqual(data, b"hello\n") |
| 156 | |
| 157 | yield subproc.stdout.read_until(b">>> ") |
| 158 | subproc.stdin.write(b"raise SystemExit\n") |
| 159 | data = yield subproc.stdout.read_until_close() |
| 160 | self.assertEqual(data, b"") |
| 161 | |
| 162 | @gen_test |
| 163 | def test_close_stdin(self): |
nothing calls this directly
no test coverage detected