MCPcopy
hub / github.com/celery/celery / test_terminate_job

Method test_terminate_job

t/unit/concurrency/test_eventlet.py:138–150  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

136 }
137
138 def test_terminate_job(self):
139 func = Mock()
140 pool = TaskPool(10)
141 pool.on_start()
142 pool.on_apply(func)
143
144 assert len(pool._pool_map.keys()) == 1
145 pid = list(pool._pool_map.keys())[0]
146 greenlet = pool._pool_map[pid]
147
148 pool.terminate_job(pid)
149 greenlet.link.assert_called_once()
150 greenlet.kill.assert_called_once()
151
152 def test_make_killable_target(self):
153 def valid_target():

Callers

nothing calls this directly

Calls 5

on_startMethod · 0.95
on_applyMethod · 0.95
terminate_jobMethod · 0.95
TaskPoolClass · 0.90
keysMethod · 0.80

Tested by

no test coverage detected