(self)
| 503 | file.unlink() |
| 504 | |
| 505 | def test_pre_fork_compile(self): |
| 506 | code = """if 1: |
| 507 | import sys |
| 508 | import os |
| 509 | import sysconfig |
| 510 | from _testinternalcapi import ( |
| 511 | compile_perf_trampoline_entry, |
| 512 | perf_trampoline_set_persist_after_fork, |
| 513 | ) |
| 514 | |
| 515 | def foo_fork(): |
| 516 | pass |
| 517 | |
| 518 | def bar_fork(): |
| 519 | foo_fork() |
| 520 | |
| 521 | def foo(): |
| 522 | import time; time.sleep(1) |
| 523 | |
| 524 | def bar(): |
| 525 | foo() |
| 526 | |
| 527 | def compile_trampolines_for_all_functions(): |
| 528 | perf_trampoline_set_persist_after_fork(1) |
| 529 | for _, obj in globals().items(): |
| 530 | if callable(obj) and hasattr(obj, '__code__'): |
| 531 | compile_perf_trampoline_entry(obj.__code__) |
| 532 | |
| 533 | if __name__ == "__main__": |
| 534 | compile_trampolines_for_all_functions() |
| 535 | pid = os.fork() |
| 536 | if pid == 0: |
| 537 | print(os.getpid()) |
| 538 | bar_fork() |
| 539 | else: |
| 540 | bar() |
| 541 | """ |
| 542 | |
| 543 | with temp_dir() as script_dir: |
| 544 | script = make_script(script_dir, "perftest", code) |
| 545 | env = {**os.environ, "PYTHON_JIT": "0"} |
| 546 | with subprocess.Popen( |
| 547 | [sys.executable, "-Xperf", script], |
| 548 | universal_newlines=True, |
| 549 | stderr=subprocess.PIPE, |
| 550 | stdout=subprocess.PIPE, |
| 551 | env=env, |
| 552 | ) as process: |
| 553 | stdout, stderr = process.communicate() |
| 554 | |
| 555 | self.assertEqual(process.returncode, 0) |
| 556 | self.assertNotIn("Error:", stderr) |
| 557 | child_pid = int(stdout.strip()) |
| 558 | perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map") |
| 559 | perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map") |
| 560 | self.assertTrue(perf_file.exists()) |
| 561 | self.assertTrue(perf_child_file.exists()) |
| 562 |
nothing calls this directly
no test coverage detected