MCPcopy Index your code
hub / github.com/python/cpython / test_wait_integer

Method test_wait_integer

Lib/test/_test_multiprocessing.py:5681–5722  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

5679 @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
5680 @support.requires_resource('walltime')
5681 def test_wait_integer(self):
5682 from multiprocessing.connection import wait
5683
5684 expected = 3
5685 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
5686 sem = multiprocessing.Semaphore(0)
5687 a, b = multiprocessing.Pipe()
5688 p = multiprocessing.Process(target=self.signal_and_sleep,
5689 args=(sem, expected))
5690
5691 p.start()
5692 self.assertIsInstance(p.sentinel, int)
5693 self.assertTrue(sem.acquire(timeout=20))
5694
5695 start = time.monotonic()
5696 res = wait([a, p.sentinel, b], expected + 20)
5697 delta = time.monotonic() - start
5698
5699 self.assertEqual(res, [p.sentinel])
5700 self.assertLess(delta, expected + 2)
5701 self.assertGreater(delta, expected - 2)
5702
5703 a.send(None)
5704
5705 start = time.monotonic()
5706 res = wait([a, p.sentinel, b], 20)
5707 delta = time.monotonic() - start
5708
5709 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
5710 self.assertLess(delta, 0.4)
5711
5712 b.send(None)
5713
5714 start = time.monotonic()
5715 res = wait([a, p.sentinel, b], 20)
5716 delta = time.monotonic() - start
5717
5718 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
5719 self.assertLess(delta, 0.4)
5720
5721 p.terminate()
5722 p.join()
5723
5724 @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
5725 def test_neg_timeout(self):

Callers

nothing calls this directly

Calls 15

acquireMethod · 0.95
terminateMethod · 0.95
waitFunction · 0.90
idFunction · 0.85
SemaphoreMethod · 0.80
PipeMethod · 0.80
assertIsInstanceMethod · 0.80
assertTrueMethod · 0.80
assertGreaterMethod · 0.80
ProcessMethod · 0.45
startMethod · 0.45
assertEqualMethod · 0.45

Tested by

no test coverage detected