MCPcopy
hub / github.com/celery/celery / test_waitexec

Method test_waitexec

t/unit/apps/test_multi.py:239–261  ·  view source on GitHub ↗
(self, Popen, argv=['A', 'B'])

Source from the content-addressed store, hash-verified

237
238 @patch('celery.apps.multi.Popen')
239 def test_waitexec(self, Popen, argv=['A', 'B']):
240 on_spawn = Mock(name='on_spawn')
241 on_signalled = Mock(name='on_signalled')
242 on_failure = Mock(name='on_failure')
243 env = Mock(name='env')
244 self.node.handle_process_exit = Mock(name='handle_process_exit')
245
246 self.node._waitexec(
247 argv,
248 path='python',
249 env=env,
250 on_spawn=on_spawn,
251 on_signalled=on_signalled,
252 on_failure=on_failure,
253 )
254
255 Popen.assert_called_with(
256 self.node.prepare_argv(argv, 'python'), env=env)
257 self.node.handle_process_exit.assert_called_with(
258 Popen().wait(),
259 on_signalled=on_signalled,
260 on_failure=on_failure,
261 )
262
263 def test_handle_process_exit(self):
264 assert self.node.handle_process_exit(0) == 0

Callers

nothing calls this directly

Calls 3

_waitexecMethod · 0.80
prepare_argvMethod · 0.80
waitMethod · 0.45

Tested by

no test coverage detected