MCPcopy
hub / github.com/celery/celery / test_process_task

Method test_process_task

t/unit/worker/test_worker.py:950–961  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

948 assert 'Next ETA %s secs' in fmt
949
950 def test_process_task(self):
951 worker = self.worker
952 worker.pool = Mock()
953 channel = Mock()
954 m = self.create_task_message(
955 channel, self.foo_task.name,
956 args=[4, 8, 10], kwargs={},
957 )
958 task = Request(m, app=self.app)
959 worker._process_task(task)
960 assert worker.pool.apply_async.call_count == 1
961 worker.pool.stop()
962
963 def test_process_task_raise_base(self):
964 worker = self.worker

Callers

nothing calls this directly

Calls 4

RequestClass · 0.90
_process_taskMethod · 0.80
create_task_messageMethod · 0.45
stopMethod · 0.45

Tested by

no test coverage detected