MCPcopy
hub / github.com/celery/celery / send_task

Method send_task

celery/app/base.py:842–981  ·  celery/app/base.py::Celery.send_task

Send task by name. Supports the same arguments as :meth:`@-Task.apply_async`. Arguments: name (str): Name of task to call (e.g., `"tasks.add"`). result_cls (AsyncResult): Specify custom result class.

(self, name, args=None, kwargs=None, countdown=None,
                  eta=None, task_id=None, producer=None, connection=None,
                  router=None, result_cls=None, expires=None,
                  publisher=None, link=None, link_error=None,
                  add_to_parent=True, group_id=None, group_index=None,
                  retries=0, chord=None,
                  reply_to=None, time_limit=None, soft_time_limit=None,
                  root_id=None, parent_id=None, route_name=None,
                  shadow=None, chain=None, task_type=None, replaced_task_nesting=0, **options)

Source from the content-addressed store, hash-verified

840 ], related_name=related_name)
841
842 def send_task(self, name, args=None, kwargs=None, countdown=None,
843 eta=None, task_id=None, producer=None, connection=None,
844 router=None, result_cls=None, expires=None,
845 publisher=None, link=None, link_error=None,
846 add_to_parent=True, group_id=None, group_index=None,
847 retries=0, chord=None,
848 reply_to=None, time_limit=None, soft_time_limit=None,
849 root_id=None, parent_id=None, route_name=None,
850 shadow=None, chain=None, task_type=None, replaced_task_nesting=0, **options):
851 class="st">"""Send task by name.
852
853 Supports the same arguments as :meth:`@-Task.apply_async`.
854
855 Arguments:
856 name (str): Name of task to call (e.g., `class="st">"tasks.add"`).
857 result_cls (AsyncResult): Specify custom result class.
858 class="st">"""
859 parent = have_parent = None
860 amqp = self.amqp
861 task_id = task_id or uuid()
862 producer = producer or publisher class="cm"># XXX compat
863 router = router or amqp.router
864 conf = self.conf
865 if conf.task_always_eager: class="cm"># pragma: no cover
866 warnings.warn(AlwaysEagerIgnored(
867 &class="cm">#x27;task_always_eager has no effect on send_task',
868 ), stacklevel=2)
869
870 ignore_result = options.pop(&class="cm">#x27;ignore_result', False)
871 options = router.route(
872 options, route_name or name, args, kwargs, task_type)
873
874 driver_type = self.producer_pool.connections.connection.transport.driver_type
875
876 if (eta or countdown) and detect_quorum_queues(self, driver_type)[0]:
877
878 queue = options.get(class="st">"queue")
879 exchange_type = queue.exchange.type if queue else options[class="st">"exchange_type"]
880 routing_key = queue.routing_key if queue else options[class="st">"routing_key"]
881 exchange_name = queue.exchange.name if queue else options[class="st">"exchange"]
882
883 if exchange_type != &class="cm">#x27;direct':
884 if eta:
885 if isinstance(eta, str):
886 eta = isoparse(eta)
887 countdown = (maybe_make_aware(eta) - self.now()).total_seconds()
888
889 if countdown:
890 if countdown > 0:
891 routing_key = calculate_routing_key(int(countdown), routing_key)
892 exchange = Exchange(
893 &class="cm">#x27;celery_delayed_27',
894 type=&class="cm">#x27;topic',
895 )
896 options.pop(class="st">"queue", None)
897 options[&class="cm">#x27;routing_key'] = routing_key
898 options[&class="cm">#x27;exchange'] = exchange
899

Callers 2

apply_asyncMethod · 0.45
callFunction · 0.45

Calls 14

nowMethod · 0.95
producer_or_acquireMethod · 0.95
AlwaysEagerIgnoredClass · 0.90
maybe_make_awareFunction · 0.90
detect_quorum_queuesFunction · 0.85
maybe_listFunction · 0.85
routeMethod · 0.80
send_task_messageMethod · 0.80
add_trailMethod · 0.80
popMethod · 0.45
getMethod · 0.45
setdefaultMethod · 0.45

Tested by

no test coverage detected