(self, request, exc, traceback)
| 242 | self._call_task_errbacks(request, exc, traceback) |
| 243 | |
| 244 | def _call_task_errbacks(self, request, exc, traceback): |
| 245 | old_signature = [] |
| 246 | for errback in request.errbacks: |
| 247 | errback = self.app.signature(errback) |
| 248 | if not errback._app: |
| 249 | # Ensure all signatures have an application |
| 250 | errback._app = self.app |
| 251 | try: |
| 252 | if ( |
| 253 | # Celery tasks type created with the @task decorator have |
| 254 | # the __header__ property, but Celery task created from |
| 255 | # Task class do not have this property. |
| 256 | # That's why we have to check if this property exists |
| 257 | # before checking is it partial function. |
| 258 | hasattr(errback.type, '__header__') and |
| 259 | |
| 260 | # workaround to support tasks with bind=True executed as |
| 261 | # link errors. Otherwise, retries can't be used |
| 262 | not isinstance(errback.type.__header__, partial) and |
| 263 | arity_greater(errback.type.__header__, 1) |
| 264 | ): |
| 265 | errback(request, exc, traceback) |
| 266 | else: |
| 267 | old_signature.append(errback) |
| 268 | except NotRegistered: |
| 269 | # Task may not be present in this worker. |
| 270 | # We simply send it forward for another worker to consume. |
| 271 | # If the task is not registered there, the worker will raise |
| 272 | # NotRegistered. |
| 273 | old_signature.append(errback) |
| 274 | |
| 275 | if old_signature: |
| 276 | # Previously errback was called as a task so we still |
| 277 | # need to do so if the errback only takes a single task_id arg. |
| 278 | task_id = request.id |
| 279 | root_id = request.root_id or task_id |
| 280 | g = group(old_signature, app=self.app) |
| 281 | if self.app.conf.task_always_eager or request.delivery_info.get('is_eager', False): |
| 282 | g.apply( |
| 283 | (task_id,), parent_id=task_id, root_id=root_id |
| 284 | ) |
| 285 | else: |
| 286 | g.apply_async( |
| 287 | (task_id,), parent_id=task_id, root_id=root_id |
| 288 | ) |
| 289 | |
| 290 | def mark_as_revoked(self, task_id, reason='', |
| 291 | request=None, store_result=True, state=states.REVOKED): |
no test coverage detected