MCPcopy
hub / github.com/celery/celery / execute_using_pool

Method execute_using_pool

celery/worker/request.py:341–372  ·  view source on GitHub ↗

Used by the worker to send this task to the pool. Arguments: pool (~celery.concurrency.base.TaskPool): The execution pool used to execute this request. Raises: celery.exceptions.TaskRevokedError: if the task was revoked.

(self, pool: BasePool, **kwargs)

Source from the content-addressed store, hash-verified

339 return self._request_dict['correlation_id']
340
341 def execute_using_pool(self, pool: BasePool, **kwargs):
342 """Used by the worker to send this task to the pool.
343
344 Arguments:
345 pool (~celery.concurrency.base.TaskPool): The execution pool
346 used to execute this request.
347
348 Raises:
349 celery.exceptions.TaskRevokedError: if the task was revoked.
350 """
351 task_id = self.id
352 task = self._task
353 if self.revoked():
354 raise TaskRevokedError(task_id)
355
356 time_limit, soft_time_limit = self.time_limits
357 trace = fast_trace_task if self._app.use_fast_trace_task else trace_task_ret
358 result = pool.apply_async(
359 trace,
360 args=(self._type, task_id, self._request_dict, self._body,
361 self._content_type, self._content_encoding),
362 accept_callback=self.on_accepted,
363 timeout_callback=self.on_timeout,
364 callback=self.on_success,
365 error_callback=self.on_failure,
366 soft_timeout=soft_time_limit or task.soft_time_limit,
367 timeout=time_limit or task.time_limit,
368 correlation_id=task_id,
369 )
370 # cannot create weakref to None
371 self._apply_result = maybe(ref, result)
372 return result
373
374 def execute(self, loglevel=None, logfile=None):
375 """Execute the task in a :func:`~celery.app.trace.trace_task`.

Calls 4

revokedMethod · 0.95
TaskRevokedErrorClass · 0.90
maybeFunction · 0.90
apply_asyncMethod · 0.45