MCPcopy
hub / github.com/celery/celery / revoked

Method revoked

celery/worker/request.py:470–509  ·  view source on GitHub ↗

If revoked, skip task and mark state.

(self)

Source from the content-addressed store, hash-verified

468 terminated=terminated, signum=signum, expired=expired)
469
470 def revoked(self):
471 """If revoked, skip task and mark state."""
472 expired = False
473 if self._already_revoked:
474 return True
475 if self.expires:
476 expired = self.maybe_expire()
477 revoked_by_id = self.id in revoked_tasks
478 revoked_by_header, revoking_header = False, None
479
480 if not revoked_by_id and self.stamped_headers:
481 for stamp in self.stamped_headers:
482 if stamp in revoked_stamps:
483 revoked_header = revoked_stamps[stamp]
484 stamped_header = self._message.headers['stamps'][stamp]
485
486 if isinstance(stamped_header, (list, tuple)):
487 for stamped_value in stamped_header:
488 if stamped_value in maybe_list(revoked_header):
489 revoked_by_header = True
490 revoking_header = {stamp: stamped_value}
491 break
492 else:
493 revoked_by_header = any([
494 stamped_header in maybe_list(revoked_header),
495 stamped_header == revoked_header, # When the header is a single set value
496 ])
497 revoking_header = {stamp: stamped_header}
498 break
499
500 if any((expired, revoked_by_id, revoked_by_header)):
501 log_msg = 'Discarding revoked task: %s[%s]'
502 if revoked_by_header:
503 log_msg += ' (revoked by header: %s)' % revoking_header
504 info(log_msg, self.name, self.id)
505 self._announce_revoked(
506 'expired' if expired else 'revoked', False, None, expired,
507 )
508 return True
509 return False
510
511 def send_event(self, type, **fields):
512 if self._eventer and self._eventer.enabled and self.task.send_events:

Callers 3

execute_using_poolMethod · 0.95
executeMethod · 0.95
task_message_handlerFunction · 0.45

Calls 4

maybe_expireMethod · 0.95
_announce_revokedMethod · 0.95
maybe_listFunction · 0.85
infoFunction · 0.85

Tested by

no test coverage detected