(self)
| 2724 | os.close(new_stdout) |
| 2725 | |
| 2726 | def test_remapping_std_fds(self): |
| 2727 | # open up some temporary files |
| 2728 | temps = [tempfile.mkstemp() for i in range(3)] |
| 2729 | try: |
| 2730 | temp_fds = [fd for fd, fname in temps] |
| 2731 | |
| 2732 | # unlink the files -- we won't need to reopen them |
| 2733 | for fd, fname in temps: |
| 2734 | os.unlink(fname) |
| 2735 | |
| 2736 | # write some data to what will become stdin, and rewind |
| 2737 | os.write(temp_fds[1], b"STDIN") |
| 2738 | os.lseek(temp_fds[1], 0, 0) |
| 2739 | |
| 2740 | # move the standard file descriptors out of the way |
| 2741 | saved_fds = self._save_fds(range(3)) |
| 2742 | try: |
| 2743 | # duplicate the file objects over the standard fd's |
| 2744 | for fd, temp_fd in enumerate(temp_fds): |
| 2745 | os.dup2(temp_fd, fd) |
| 2746 | |
| 2747 | # now use those files in the "wrong" order, so that subprocess |
| 2748 | # has to rearrange them in the child |
| 2749 | p = subprocess.Popen([sys.executable, "-c", |
| 2750 | 'import sys; got = sys.stdin.read();' |
| 2751 | 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], |
| 2752 | stdin=temp_fds[1], |
| 2753 | stdout=temp_fds[2], |
| 2754 | stderr=temp_fds[0]) |
| 2755 | p.wait() |
| 2756 | finally: |
| 2757 | self._restore_fds(saved_fds) |
| 2758 | |
| 2759 | for fd in temp_fds: |
| 2760 | os.lseek(fd, 0, 0) |
| 2761 | |
| 2762 | out = os.read(temp_fds[2], 1024) |
| 2763 | err = os.read(temp_fds[0], 1024).strip() |
| 2764 | self.assertEqual(out, b"got STDIN") |
| 2765 | self.assertEqual(err, b"err") |
| 2766 | |
| 2767 | finally: |
| 2768 | for fd in temp_fds: |
| 2769 | os.close(fd) |
| 2770 | |
| 2771 | def check_swap_fds(self, stdin_no, stdout_no, stderr_no): |
| 2772 | # open up some temporary files |
nothing calls this directly
no test coverage detected