MCPcopy Index your code
hub / github.com/python/cpython / test_cancel_concurrent

Method test_cancel_concurrent

Lib/test/test_sched.py:129–158  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

127
128 @threading_helper.requires_working_threading()
129 def test_cancel_concurrent(self):
130 q = queue.Queue()
131 fun = q.put
132 timer = Timer()
133 scheduler = sched.scheduler(timer.time, timer.sleep)
134 now = timer.time()
135 event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
136 event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
137 event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
138 event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
139 event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
140 t = threading.Thread(target=scheduler.run)
141 t.start()
142 timer.advance(1)
143 self.assertEqual(q.get(timeout=TIMEOUT), 1)
144 self.assertTrue(q.empty())
145 scheduler.cancel(event2)
146 scheduler.cancel(event5)
147 timer.advance(1)
148 self.assertTrue(q.empty())
149 timer.advance(1)
150 self.assertEqual(q.get(timeout=TIMEOUT), 3)
151 self.assertTrue(q.empty())
152 timer.advance(1)
153 self.assertEqual(q.get(timeout=TIMEOUT), 4)
154 self.assertTrue(q.empty())
155 timer.advance(1000)
156 threading_helper.join_thread(t)
157 self.assertTrue(q.empty())
158 self.assertEqual(timer.time(), 4)
159
160 def test_cancel_correct_event(self):
161 # bpo-19270

Callers

nothing calls this directly

Calls 12

timeMethod · 0.95
startMethod · 0.95
advanceMethod · 0.95
assertEqualMethod · 0.95
getMethod · 0.95
assertTrueMethod · 0.95
emptyMethod · 0.95
QueueMethod · 0.80
enterabsMethod · 0.80
join_threadMethod · 0.80
TimerClass · 0.70
cancelMethod · 0.45

Tested by

no test coverage detected