MCPcopy
hub / github.com/celery/celery / _create_task_sender

Method _create_task_sender

celery/app/amqp.py:479–600  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

477 return s
478
479 def _create_task_sender(self):
480 default_retry = self.app.conf.task_publish_retry
481 default_policy = self.app.conf.task_publish_retry_policy
482 default_delivery_mode = self.app.conf.task_default_delivery_mode
483 default_queue = self.default_queue
484 queues = self.queues
485 send_before_publish = signals.before_task_publish.send
486 before_receivers = signals.before_task_publish.receivers
487 send_after_publish = signals.after_task_publish.send
488 after_receivers = signals.after_task_publish.receivers
489
490 send_task_sent = signals.task_sent.send # XXX compat
491 sent_receivers = signals.task_sent.receivers
492
493 default_evd = self._event_dispatcher
494 default_exchange = self.default_exchange
495
496 default_rkey = self.app.conf.task_default_routing_key
497 default_serializer = self.app.conf.task_serializer
498 default_compressor = self.app.conf.task_compression
499
500 def send_task_message(producer, name, message,
501 exchange=None, routing_key=None, queue=None,
502 event_dispatcher=None,
503 retry=None, retry_policy=None,
504 serializer=None, delivery_mode=None,
505 compression=None, declare=None,
506 headers=None, exchange_type=None,
507 timeout=None, confirm_timeout=None, **kwargs):
508 retry = default_retry if retry is None else retry
509 headers2, properties, body, sent_event = message
510 if headers:
511 headers2.update(headers)
512 if kwargs:
513 properties.update(kwargs)
514
515 qname = queue
516 if queue is None and exchange is None:
517 queue = default_queue
518 if queue is not None:
519 if isinstance(queue, str):
520 qname, queue = queue, queues[queue]
521 else:
522 qname = queue.name
523
524 if delivery_mode is None:
525 try:
526 delivery_mode = queue.exchange.delivery_mode
527 except AttributeError:
528 pass
529 delivery_mode = delivery_mode or default_delivery_mode
530
531 if exchange_type is None:
532 try:
533 exchange_type = queue.exchange.type
534 except AttributeError:
535 exchange_type = 'direct'
536

Callers 1

send_task_messageMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected