MCPcopy
hub / github.com/celery/celery / apply_async

Method apply_async

celery/beat.py:393–418  ·  view source on GitHub ↗
(self, entry, producer=None, advance=True, **kwargs)

Source from the content-addressed store, hash-verified

391 return new_entry
392
393 def apply_async(self, entry, producer=None, advance=True, **kwargs):
394 # Update time-stamps and run counts before we actually execute,
395 # so we have that done if an exception is raised (doesn't schedule
396 # forever.)
397 entry = self.reserve(entry) if advance else entry
398 task = self.app.tasks.get(entry.task)
399
400 try:
401 entry_args = _evaluate_entry_args(entry.args)
402 entry_kwargs = _evaluate_entry_kwargs(entry.kwargs)
403 if task:
404 return task.apply_async(entry_args, entry_kwargs,
405 producer=producer,
406 **entry.options)
407 else:
408 return self.send_task(entry.task, entry_args, entry_kwargs,
409 producer=producer,
410 **entry.options)
411 except Exception as exc: # pylint: disable=broad-except
412 reraise(SchedulingError, SchedulingError(
413 "Couldn't apply scheduled task {0.name}: {exc}".format(
414 entry, exc=exc)), sys.exc_info()[2])
415 finally:
416 self._tasks_since_sync += 1
417 if self.should_sync():
418 self._do_sync()
419
420 def send_task(self, *args, **kwargs):
421 return self.app.send_task(*args, **kwargs)

Callers 1

apply_entryMethod · 0.95

Calls 10

reserveMethod · 0.95
send_taskMethod · 0.95
should_syncMethod · 0.95
_do_syncMethod · 0.95
_evaluate_entry_argsFunction · 0.85
_evaluate_entry_kwargsFunction · 0.85
reraiseFunction · 0.85
SchedulingErrorClass · 0.85
getMethod · 0.45
formatMethod · 0.45

Tested by

no test coverage detected