Used by the worker to send this task to the pool. Arguments: pool (~celery.concurrency.base.TaskPool): The execution pool used to execute this request. Raises: celery.exceptions.TaskRevokedError: if the task was revoked.
(self, pool: BasePool, **kwargs)
| 339 | return self._request_dict['correlation_id'] |
| 340 | |
| 341 | def execute_using_pool(self, pool: BasePool, **kwargs): |
| 342 | """Used by the worker to send this task to the pool. |
| 343 | |
| 344 | Arguments: |
| 345 | pool (~celery.concurrency.base.TaskPool): The execution pool |
| 346 | used to execute this request. |
| 347 | |
| 348 | Raises: |
| 349 | celery.exceptions.TaskRevokedError: if the task was revoked. |
| 350 | """ |
| 351 | task_id = self.id |
| 352 | task = self._task |
| 353 | if self.revoked(): |
| 354 | raise TaskRevokedError(task_id) |
| 355 | |
| 356 | time_limit, soft_time_limit = self.time_limits |
| 357 | trace = fast_trace_task if self._app.use_fast_trace_task else trace_task_ret |
| 358 | result = pool.apply_async( |
| 359 | trace, |
| 360 | args=(self._type, task_id, self._request_dict, self._body, |
| 361 | self._content_type, self._content_encoding), |
| 362 | accept_callback=self.on_accepted, |
| 363 | timeout_callback=self.on_timeout, |
| 364 | callback=self.on_success, |
| 365 | error_callback=self.on_failure, |
| 366 | soft_timeout=soft_time_limit or task.soft_time_limit, |
| 367 | timeout=time_limit or task.time_limit, |
| 368 | correlation_id=task_id, |
| 369 | ) |
| 370 | # cannot create weakref to None |
| 371 | self._apply_result = maybe(ref, result) |
| 372 | return result |
| 373 | |
| 374 | def execute(self, loglevel=None, logfile=None): |
| 375 | """Execute the task in a :func:`~celery.app.trace.trace_task`. |