(self)
| 127 | |
| 128 | @threading_helper.requires_working_threading() |
| 129 | def test_cancel_concurrent(self): |
| 130 | q = queue.Queue() |
| 131 | fun = q.put |
| 132 | timer = Timer() |
| 133 | scheduler = sched.scheduler(timer.time, timer.sleep) |
| 134 | now = timer.time() |
| 135 | event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) |
| 136 | event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) |
| 137 | event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) |
| 138 | event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) |
| 139 | event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) |
| 140 | t = threading.Thread(target=scheduler.run) |
| 141 | t.start() |
| 142 | timer.advance(1) |
| 143 | self.assertEqual(q.get(timeout=TIMEOUT), 1) |
| 144 | self.assertTrue(q.empty()) |
| 145 | scheduler.cancel(event2) |
| 146 | scheduler.cancel(event5) |
| 147 | timer.advance(1) |
| 148 | self.assertTrue(q.empty()) |
| 149 | timer.advance(1) |
| 150 | self.assertEqual(q.get(timeout=TIMEOUT), 3) |
| 151 | self.assertTrue(q.empty()) |
| 152 | timer.advance(1) |
| 153 | self.assertEqual(q.get(timeout=TIMEOUT), 4) |
| 154 | self.assertTrue(q.empty()) |
| 155 | timer.advance(1000) |
| 156 | threading_helper.join_thread(t) |
| 157 | self.assertTrue(q.empty()) |
| 158 | self.assertEqual(timer.time(), 4) |
| 159 | |
| 160 | def test_cancel_correct_event(self): |
| 161 | # bpo-19270 |
nothing calls this directly
no test coverage detected