MCPcopy Index your code
hub / github.com/cloudpipe/cloudpickle / assert_run_python_script

Function assert_run_python_script

tests/testutils.py:179–216  ·  view source on GitHub ↗

Utility to help check pickleability of objects defined in __main__ The script provided in the source code should return 0 and not print anything on stderr or stdout.

(source_code, timeout=TIMEOUT)

Source from the content-addressed store, hash-verified

177
178
179def assert_run_python_script(source_code, timeout=TIMEOUT):
180 """Utility to help check pickleability of objects defined in __main__
181
182 The script provided in the source code should return 0 and not print
183 anything on stderr or stdout.
184 """
185 fd, source_file = tempfile.mkstemp(suffix="_src_test_cloudpickle.py")
186 os.close(fd)
187 try:
188 with open(source_file, "wb") as f:
189 f.write(source_code.encode("utf-8"))
190 cmd = [sys.executable, "-W ignore", source_file]
191 cwd, env = _make_cwd_env()
192 kwargs = {
193 "cwd": cwd,
194 "stderr": STDOUT,
195 "env": env,
196 }
197 # If coverage is running, pass the config file to the subprocess
198 coverage_rc = os.environ.get("COVERAGE_PROCESS_START")
199 if coverage_rc:
200 kwargs["env"]["COVERAGE_PROCESS_START"] = coverage_rc
201 kwargs["timeout"] = timeout
202 try:
203 try:
204 out = check_output(cmd, **kwargs)
205 except CalledProcessError as e:
206 raise RuntimeError(
207 "script errored with output:\n%s" % e.output.decode("utf-8")
208 ) from e
209 if out != b"":
210 raise AssertionError(out.decode("utf-8"))
211 except TimeoutExpired as e:
212 raise RuntimeError(
213 "script timeout, output so far:\n%s" % e.output.decode("utf-8")
214 ) from e
215 finally:
216 os.unlink(source_file)
217
218
219def check_deterministic_pickle(a, b):

Calls 2

_make_cwd_envFunction · 0.85
closeMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…