(self)
| 3013 | p2.stdout.close() |
| 3014 | |
| 3015 | def test_close_fds(self): |
| 3016 | fd_status = support.findfile("fd_status.py", subdir="subprocessdata") |
| 3017 | |
| 3018 | fds = os.pipe() |
| 3019 | self.addCleanup(os.close, fds[0]) |
| 3020 | self.addCleanup(os.close, fds[1]) |
| 3021 | |
| 3022 | open_fds = set(fds) |
| 3023 | # add a bunch more fds |
| 3024 | for _ in range(9): |
| 3025 | fd = os.open(os.devnull, os.O_RDONLY) |
| 3026 | self.addCleanup(os.close, fd) |
| 3027 | open_fds.add(fd) |
| 3028 | |
| 3029 | for fd in open_fds: |
| 3030 | os.set_inheritable(fd, True) |
| 3031 | |
| 3032 | p = subprocess.Popen([sys.executable, fd_status], |
| 3033 | stdout=subprocess.PIPE, close_fds=False) |
| 3034 | output, ignored = p.communicate() |
| 3035 | remaining_fds = set(map(int, output.split(b','))) |
| 3036 | |
| 3037 | self.assertEqual(remaining_fds & open_fds, open_fds, |
| 3038 | "Some fds were closed") |
| 3039 | |
| 3040 | p = subprocess.Popen([sys.executable, fd_status], |
| 3041 | stdout=subprocess.PIPE, close_fds=True) |
| 3042 | output, ignored = p.communicate() |
| 3043 | remaining_fds = set(map(int, output.split(b','))) |
| 3044 | |
| 3045 | self.assertFalse(remaining_fds & open_fds, |
| 3046 | "Some fds were left open") |
| 3047 | self.assertIn(1, remaining_fds, "Subprocess failed") |
| 3048 | |
| 3049 | # Keep some of the fd's we opened open in the subprocess. |
| 3050 | # This tests _posixsubprocess.c's proper handling of fds_to_keep. |
| 3051 | fds_to_keep = set(open_fds.pop() for _ in range(8)) |
| 3052 | p = subprocess.Popen([sys.executable, fd_status], |
| 3053 | stdout=subprocess.PIPE, close_fds=True, |
| 3054 | pass_fds=fds_to_keep) |
| 3055 | output, ignored = p.communicate() |
| 3056 | remaining_fds = set(map(int, output.split(b','))) |
| 3057 | |
| 3058 | self.assertFalse((remaining_fds - fds_to_keep) & open_fds, |
| 3059 | "Some fds not in pass_fds were left open") |
| 3060 | self.assertIn(1, remaining_fds, "Subprocess failed") |
| 3061 | |
| 3062 | |
| 3063 | @unittest.skipIf(sys.platform.startswith("freebsd") and |
nothing calls this directly
no test coverage detected