MCPcopy Index your code
hub / github.com/python/cpython / test_enter_concurrent

Method test_enter_concurrent

Lib/test/test_sched.py:62–89  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

60
61 @threading_helper.requires_working_threading()
62 def test_enter_concurrent(self):
63 q = queue.Queue()
64 fun = q.put
65 timer = Timer()
66 scheduler = sched.scheduler(timer.time, timer.sleep)
67 scheduler.enter(1, 1, fun, (1,))
68 scheduler.enter(3, 1, fun, (3,))
69 t = threading.Thread(target=scheduler.run)
70 t.start()
71 timer.advance(1)
72 self.assertEqual(q.get(timeout=TIMEOUT), 1)
73 self.assertTrue(q.empty())
74 for x in [4, 5, 2]:
75 z = scheduler.enter(x - 1, 1, fun, (x,))
76 timer.advance(2)
77 self.assertEqual(q.get(timeout=TIMEOUT), 2)
78 self.assertEqual(q.get(timeout=TIMEOUT), 3)
79 self.assertTrue(q.empty())
80 timer.advance(1)
81 self.assertEqual(q.get(timeout=TIMEOUT), 4)
82 self.assertTrue(q.empty())
83 timer.advance(1)
84 self.assertEqual(q.get(timeout=TIMEOUT), 5)
85 self.assertTrue(q.empty())
86 timer.advance(1000)
87 threading_helper.join_thread(t)
88 self.assertTrue(q.empty())
89 self.assertEqual(timer.time(), 5)
90
91 def test_priority(self):
92 l = []

Callers

nothing calls this directly

Calls 11

startMethod · 0.95
advanceMethod · 0.95
assertEqualMethod · 0.95
getMethod · 0.95
assertTrueMethod · 0.95
emptyMethod · 0.95
timeMethod · 0.95
QueueMethod · 0.80
join_threadMethod · 0.80
TimerClass · 0.70
enterMethod · 0.45

Tested by

no test coverage detected