MCPcopy
hub / github.com/pytest-dev/pytest / LsofFdLeakChecker

Class LsofFdLeakChecker

src/_pytest/pytester.py:122–192  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

120
121
122class LsofFdLeakChecker:
123 def get_open_files(self) -> list[tuple[str, str]]:
124 if sys.version_info >= (3, 11):
125 # New in Python 3.11, ignores utf-8 mode
126 encoding = locale.getencoding()
127 else:
128 encoding = locale.getpreferredencoding(False)
129 out = subprocess.run(
130 ("lsof", "-Ffn0", "-p", str(os.getpid())),
131 stdout=subprocess.PIPE,
132 stderr=subprocess.DEVNULL,
133 check=True,
134 text=True,
135 encoding=encoding,
136 ).stdout
137
138 def isopen(line: str) -> bool:
139 return line.startswith("f") and (
140 "deleted" not in line
141 and "mem" not in line
142 and "txt" not in line
143 and "cwd" not in line
144 )
145
146 open_files = []
147
148 for line in out.split("\n"):
149 if isopen(line):
150 fields = line.split("\0")
151 fd = fields[0][1:]
152 filename = fields[1][1:]
153 if filename in IGNORE_PAM:
154 continue
155 if filename.startswith("/"):
156 open_files.append((fd, filename))
157
158 return open_files
159
160 def matching_platform(self) -> bool:
161 try:
162 subprocess.run(("lsof", "-v"), check=True)
163 except (OSError, subprocess.CalledProcessError):
164 return False
165 else:
166 return True
167
168 @hookimpl(wrapper=True, tryfirst=True)
169 def pytest_runtest_protocol(self, item: Item) -> Generator[None, object, object]:
170 lines1 = self.get_open_files()
171 try:
172 return (yield)
173 finally:
174 if hasattr(sys, "pypy_version_info"):
175 gc.collect()
176 lines2 = self.get_open_files()
177
178 new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
179 leaked_files = [t for t in lines2 if t[0] in new_fds]

Callers 1

pytest_configureFunction · 0.85

Calls

no outgoing calls

Tested by 1

pytest_configureFunction · 0.68