(self)
| 136 | } |
| 137 | |
| 138 | def test_terminate_job(self): |
| 139 | func = Mock() |
| 140 | pool = TaskPool(10) |
| 141 | pool.on_start() |
| 142 | pool.on_apply(func) |
| 143 | |
| 144 | assert len(pool._pool_map.keys()) == 1 |
| 145 | pid = list(pool._pool_map.keys())[0] |
| 146 | greenlet = pool._pool_map[pid] |
| 147 | |
| 148 | pool.terminate_job(pid) |
| 149 | greenlet.link.assert_called_once() |
| 150 | greenlet.kill.assert_called_once() |
| 151 | |
| 152 | def test_make_killable_target(self): |
| 153 | def valid_target(): |
nothing calls this directly
no test coverage detected