(self, promise=promise)
| 640 | app=self.app) |
| 641 | |
| 642 | def create_task_handler(self, promise=promise): |
| 643 | strategies = self.strategies |
| 644 | on_unknown_message = self.on_unknown_message |
| 645 | on_unknown_task = self.on_unknown_task |
| 646 | on_invalid_task = self.on_invalid_task |
| 647 | callbacks = self.on_task_message |
| 648 | call_soon = self.call_soon |
| 649 | |
| 650 | def on_task_received(message): |
| 651 | # payload will only be set for v1 protocol, since v2 |
| 652 | # will defer deserializing the message body to the pool. |
| 653 | payload = None |
| 654 | try: |
| 655 | type_ = message.headers['task'] # protocol v2 |
| 656 | except TypeError: |
| 657 | return on_unknown_message(None, message) |
| 658 | except KeyError: |
| 659 | try: |
| 660 | payload = message.decode() |
| 661 | except Exception as exc: # pylint: disable=broad-except |
| 662 | return self.on_decode_error(message, exc) |
| 663 | try: |
| 664 | type_, payload = payload['task'], payload # protocol v1 |
| 665 | except (TypeError, KeyError): |
| 666 | return on_unknown_message(payload, message) |
| 667 | try: |
| 668 | strategy = strategies[type_] |
| 669 | except KeyError as exc: |
| 670 | return on_unknown_task(None, message, exc) |
| 671 | else: |
| 672 | try: |
| 673 | ack_log_error_promise = promise( |
| 674 | call_soon, |
| 675 | (message.ack_log_error,), |
| 676 | on_error=self._restore_prefetch_count_after_connection_restart, |
| 677 | ) |
| 678 | reject_log_error_promise = promise( |
| 679 | call_soon, |
| 680 | (message.reject_log_error,), |
| 681 | on_error=self._restore_prefetch_count_after_connection_restart, |
| 682 | ) |
| 683 | |
| 684 | if ( |
| 685 | not self._maximum_prefetch_restored |
| 686 | and self.restart_count > 0 |
| 687 | and self._new_prefetch_count <= self.max_prefetch_count |
| 688 | ): |
| 689 | ack_log_error_promise.then(self._restore_prefetch_count_after_connection_restart, |
| 690 | on_error=self._restore_prefetch_count_after_connection_restart) |
| 691 | reject_log_error_promise.then(self._restore_prefetch_count_after_connection_restart, |
| 692 | on_error=self._restore_prefetch_count_after_connection_restart) |
| 693 | |
| 694 | strategy( |
| 695 | message, payload, |
| 696 | ack_log_error_promise, |
| 697 | reject_log_error_promise, |
| 698 | callbacks, |
| 699 | ) |
no outgoing calls