| 477 | return s |
| 478 | |
| 479 | def _create_task_sender(self): |
| 480 | default_retry = self.app.conf.task_publish_retry |
| 481 | default_policy = self.app.conf.task_publish_retry_policy |
| 482 | default_delivery_mode = self.app.conf.task_default_delivery_mode |
| 483 | default_queue = self.default_queue |
| 484 | queues = self.queues |
| 485 | send_before_publish = signals.before_task_publish.send |
| 486 | before_receivers = signals.before_task_publish.receivers |
| 487 | send_after_publish = signals.after_task_publish.send |
| 488 | after_receivers = signals.after_task_publish.receivers |
| 489 | |
| 490 | send_task_sent = signals.task_sent.send # XXX compat |
| 491 | sent_receivers = signals.task_sent.receivers |
| 492 | |
| 493 | default_evd = self._event_dispatcher |
| 494 | default_exchange = self.default_exchange |
| 495 | |
| 496 | default_rkey = self.app.conf.task_default_routing_key |
| 497 | default_serializer = self.app.conf.task_serializer |
| 498 | default_compressor = self.app.conf.task_compression |
| 499 | |
| 500 | def send_task_message(producer, name, message, |
| 501 | exchange=None, routing_key=None, queue=None, |
| 502 | event_dispatcher=None, |
| 503 | retry=None, retry_policy=None, |
| 504 | serializer=None, delivery_mode=None, |
| 505 | compression=None, declare=None, |
| 506 | headers=None, exchange_type=None, |
| 507 | timeout=None, confirm_timeout=None, **kwargs): |
| 508 | retry = default_retry if retry is None else retry |
| 509 | headers2, properties, body, sent_event = message |
| 510 | if headers: |
| 511 | headers2.update(headers) |
| 512 | if kwargs: |
| 513 | properties.update(kwargs) |
| 514 | |
| 515 | qname = queue |
| 516 | if queue is None and exchange is None: |
| 517 | queue = default_queue |
| 518 | if queue is not None: |
| 519 | if isinstance(queue, str): |
| 520 | qname, queue = queue, queues[queue] |
| 521 | else: |
| 522 | qname = queue.name |
| 523 | |
| 524 | if delivery_mode is None: |
| 525 | try: |
| 526 | delivery_mode = queue.exchange.delivery_mode |
| 527 | except AttributeError: |
| 528 | pass |
| 529 | delivery_mode = delivery_mode or default_delivery_mode |
| 530 | |
| 531 | if exchange_type is None: |
| 532 | try: |
| 533 | exchange_type = queue.exchange.type |
| 534 | except AttributeError: |
| 535 | exchange_type = 'direct' |
| 536 | |