Utility to help check pickleability of objects defined in __main__ The script provided in the source code should return 0 and not print anything on stderr or stdout.
(source_code, timeout=TIMEOUT)
| 177 | |
| 178 | |
| 179 | def assert_run_python_script(source_code, timeout=TIMEOUT): |
| 180 | """Utility to help check pickleability of objects defined in __main__ |
| 181 | |
| 182 | The script provided in the source code should return 0 and not print |
| 183 | anything on stderr or stdout. |
| 184 | """ |
| 185 | fd, source_file = tempfile.mkstemp(suffix="_src_test_cloudpickle.py") |
| 186 | os.close(fd) |
| 187 | try: |
| 188 | with open(source_file, "wb") as f: |
| 189 | f.write(source_code.encode("utf-8")) |
| 190 | cmd = [sys.executable, "-W ignore", source_file] |
| 191 | cwd, env = _make_cwd_env() |
| 192 | kwargs = { |
| 193 | "cwd": cwd, |
| 194 | "stderr": STDOUT, |
| 195 | "env": env, |
| 196 | } |
| 197 | # If coverage is running, pass the config file to the subprocess |
| 198 | coverage_rc = os.environ.get("COVERAGE_PROCESS_START") |
| 199 | if coverage_rc: |
| 200 | kwargs["env"]["COVERAGE_PROCESS_START"] = coverage_rc |
| 201 | kwargs["timeout"] = timeout |
| 202 | try: |
| 203 | try: |
| 204 | out = check_output(cmd, **kwargs) |
| 205 | except CalledProcessError as e: |
| 206 | raise RuntimeError( |
| 207 | "script errored with output:\n%s" % e.output.decode("utf-8") |
| 208 | ) from e |
| 209 | if out != b"": |
| 210 | raise AssertionError(out.decode("utf-8")) |
| 211 | except TimeoutExpired as e: |
| 212 | raise RuntimeError( |
| 213 | "script timeout, output so far:\n%s" % e.output.decode("utf-8") |
| 214 | ) from e |
| 215 | finally: |
| 216 | os.unlink(source_file) |
| 217 | |
| 218 | |
| 219 | def check_deterministic_pickle(a, b): |
no test coverage detected
searching dependent graphs…