MCPcopy
hub / github.com/celery/celery / test_terminate_job

Method test_terminate_job

t/unit/concurrency/test_gevent.py:85–99  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

83 assert x.num_processes == 3
84
85 def test_terminate_job(self):
86 func = Mock()
87 pool = TaskPool(10)
88 pool.on_start()
89 pool.on_apply(func)
90
91 assert len(pool._pool_map.keys()) == 1
92 pid = list(pool._pool_map.keys())[0]
93 greenlet = pool._pool_map[pid]
94 greenlet.link.assert_called_once()
95
96 pool.terminate_job(pid)
97 import gevent
98
99 gevent.kill.assert_called_once()
100
101 def test_make_killable_target(self):
102 def valid_target():

Callers

nothing calls this directly

Calls 5

on_startMethod · 0.95
on_applyMethod · 0.95
terminate_jobMethod · 0.95
TaskPoolClass · 0.90
keysMethod · 0.80

Tested by

no test coverage detected