(self)
| 497 | @threading_helper.reap_threads |
| 498 | @threading_helper.requires_working_threading() |
| 499 | def test_current_frames(self): |
| 500 | import threading |
| 501 | import traceback |
| 502 | |
| 503 | # Spawn a thread that blocks at a known place. Then the main |
| 504 | # thread does sys._current_frames(), and verifies that the frames |
| 505 | # returned make sense. |
| 506 | entered_g = threading.Event() |
| 507 | leave_g = threading.Event() |
| 508 | thread_info = [] # the thread's id |
| 509 | |
| 510 | def f123(): |
| 511 | g456() |
| 512 | |
| 513 | def g456(): |
| 514 | thread_info.append(threading.get_ident()) |
| 515 | entered_g.set() |
| 516 | leave_g.wait() |
| 517 | |
| 518 | t = threading.Thread(target=f123) |
| 519 | t.start() |
| 520 | entered_g.wait() |
| 521 | |
| 522 | try: |
| 523 | # At this point, t has finished its entered_g.set(), although it's |
| 524 | # impossible to guess whether it's still on that line or has moved on |
| 525 | # to its leave_g.wait(). |
| 526 | self.assertEqual(len(thread_info), 1) |
| 527 | thread_id = thread_info[0] |
| 528 | |
| 529 | d = sys._current_frames() |
| 530 | for tid in d: |
| 531 | self.assertIsInstance(tid, int) |
| 532 | self.assertGreater(tid, 0) |
| 533 | |
| 534 | main_id = threading.get_ident() |
| 535 | self.assertIn(main_id, d) |
| 536 | self.assertIn(thread_id, d) |
| 537 | |
| 538 | # Verify that the captured main-thread frame is _this_ frame. |
| 539 | frame = d.pop(main_id) |
| 540 | self.assertTrue(frame is sys._getframe()) |
| 541 | |
| 542 | # Verify that the captured thread frame is blocked in g456, called |
| 543 | # from f123. This is a little tricky, since various bits of |
| 544 | # threading.py are also in the thread's call stack. |
| 545 | frame = d.pop(thread_id) |
| 546 | stack = traceback.extract_stack(frame) |
| 547 | for i, (filename, lineno, funcname, sourceline) in enumerate(stack): |
| 548 | if funcname == "f123": |
| 549 | break |
| 550 | else: |
| 551 | self.fail("didn't find f123() on thread's call stack") |
| 552 | |
| 553 | self.assertEqual(sourceline, "g456()") |
| 554 | |
| 555 | # And the next record must be for g456(). |
| 556 | filename, lineno, funcname, sourceline = stack[i+1] |
nothing calls this directly
no test coverage detected