Wait for PID to terminate using kqueue(). macOS and BSD only.
(self, timeout)
| 2129 | os.close(pidfd) |
| 2130 | |
| 2131 | def _wait_kqueue(self, timeout): |
| 2132 | """Wait for PID to terminate using kqueue(). macOS and BSD only.""" |
| 2133 | if not _CAN_USE_KQUEUE: |
| 2134 | return False |
| 2135 | try: |
| 2136 | kq = select.kqueue() |
| 2137 | except OSError: |
| 2138 | # likely EMFILE / ENFILE (too many open files) |
| 2139 | return False |
| 2140 | |
| 2141 | try: |
| 2142 | kev = select.kevent( |
| 2143 | self.pid, |
| 2144 | filter=select.KQ_FILTER_PROC, |
| 2145 | flags=select.KQ_EV_ADD | select.KQ_EV_ONESHOT, |
| 2146 | fflags=select.KQ_NOTE_EXIT, |
| 2147 | ) |
| 2148 | try: |
| 2149 | events = kq.control([kev], 1, timeout) # wait |
| 2150 | except OSError: |
| 2151 | return False |
| 2152 | else: |
| 2153 | if not events: |
| 2154 | raise TimeoutExpired(self.args, timeout) |
| 2155 | return True |
| 2156 | finally: |
| 2157 | kq.close() |
| 2158 | |
| 2159 | def _wait(self, timeout): |
| 2160 | """Internal implementation of wait() on POSIX. |
no test coverage detected