MCPcopy
hub / github.com/celery/celery / create_task_handler

Method create_task_handler

celery/worker/consumer/consumer.py:642–705  ·  view source on GitHub ↗
(self, promise=promise)

Source from the content-addressed store, hash-verified

640 app=self.app)
641
642 def create_task_handler(self, promise=promise):
643 strategies = self.strategies
644 on_unknown_message = self.on_unknown_message
645 on_unknown_task = self.on_unknown_task
646 on_invalid_task = self.on_invalid_task
647 callbacks = self.on_task_message
648 call_soon = self.call_soon
649
650 def on_task_received(message):
651 # payload will only be set for v1 protocol, since v2
652 # will defer deserializing the message body to the pool.
653 payload = None
654 try:
655 type_ = message.headers['task'] # protocol v2
656 except TypeError:
657 return on_unknown_message(None, message)
658 except KeyError:
659 try:
660 payload = message.decode()
661 except Exception as exc: # pylint: disable=broad-except
662 return self.on_decode_error(message, exc)
663 try:
664 type_, payload = payload['task'], payload # protocol v1
665 except (TypeError, KeyError):
666 return on_unknown_message(payload, message)
667 try:
668 strategy = strategies[type_]
669 except KeyError as exc:
670 return on_unknown_task(None, message, exc)
671 else:
672 try:
673 ack_log_error_promise = promise(
674 call_soon,
675 (message.ack_log_error,),
676 on_error=self._restore_prefetch_count_after_connection_restart,
677 )
678 reject_log_error_promise = promise(
679 call_soon,
680 (message.reject_log_error,),
681 on_error=self._restore_prefetch_count_after_connection_restart,
682 )
683
684 if (
685 not self._maximum_prefetch_restored
686 and self.restart_count > 0
687 and self._new_prefetch_count <= self.max_prefetch_count
688 ):
689 ack_log_error_promise.then(self._restore_prefetch_count_after_connection_restart,
690 on_error=self._restore_prefetch_count_after_connection_restart)
691 reject_log_error_promise.then(self._restore_prefetch_count_after_connection_restart,
692 on_error=self._restore_prefetch_count_after_connection_restart)
693
694 strategy(
695 message, payload,
696 ack_log_error_promise,
697 reject_log_error_promise,
698 callbacks,
699 )

Callers 3

asynloopFunction · 0.80
synloopFunction · 0.80

Calls

no outgoing calls

Tested by 1