(self)
| 948 | assert 'Next ETA %s secs' in fmt |
| 949 | |
| 950 | def test_process_task(self): |
| 951 | worker = self.worker |
| 952 | worker.pool = Mock() |
| 953 | channel = Mock() |
| 954 | m = self.create_task_message( |
| 955 | channel, self.foo_task.name, |
| 956 | args=[4, 8, 10], kwargs={}, |
| 957 | ) |
| 958 | task = Request(m, app=self.app) |
| 959 | worker._process_task(task) |
| 960 | assert worker.pool.apply_async.call_count == 1 |
| 961 | worker.pool.stop() |
| 962 | |
| 963 | def test_process_task_raise_base(self): |
| 964 | worker = self.worker |
nothing calls this directly
no test coverage detected