(self, Popen, argv=['A', 'B'])
| 237 | |
| 238 | @patch('celery.apps.multi.Popen') |
| 239 | def test_waitexec(self, Popen, argv=['A', 'B']): |
| 240 | on_spawn = Mock(name='on_spawn') |
| 241 | on_signalled = Mock(name='on_signalled') |
| 242 | on_failure = Mock(name='on_failure') |
| 243 | env = Mock(name='env') |
| 244 | self.node.handle_process_exit = Mock(name='handle_process_exit') |
| 245 | |
| 246 | self.node._waitexec( |
| 247 | argv, |
| 248 | path='python', |
| 249 | env=env, |
| 250 | on_spawn=on_spawn, |
| 251 | on_signalled=on_signalled, |
| 252 | on_failure=on_failure, |
| 253 | ) |
| 254 | |
| 255 | Popen.assert_called_with( |
| 256 | self.node.prepare_argv(argv, 'python'), env=env) |
| 257 | self.node.handle_process_exit.assert_called_with( |
| 258 | Popen().wait(), |
| 259 | on_signalled=on_signalled, |
| 260 | on_failure=on_failure, |
| 261 | ) |
| 262 | |
| 263 | def test_handle_process_exit(self): |
| 264 | assert self.node.handle_process_exit(0) == 0 |
nothing calls this directly
no test coverage detected