(self, entry, producer=None, advance=True, **kwargs)
| 391 | return new_entry |
| 392 | |
| 393 | def apply_async(self, entry, producer=None, advance=True, **kwargs): |
| 394 | # Update time-stamps and run counts before we actually execute, |
| 395 | # so we have that done if an exception is raised (doesn't schedule |
| 396 | # forever.) |
| 397 | entry = self.reserve(entry) if advance else entry |
| 398 | task = self.app.tasks.get(entry.task) |
| 399 | |
| 400 | try: |
| 401 | entry_args = _evaluate_entry_args(entry.args) |
| 402 | entry_kwargs = _evaluate_entry_kwargs(entry.kwargs) |
| 403 | if task: |
| 404 | return task.apply_async(entry_args, entry_kwargs, |
| 405 | producer=producer, |
| 406 | **entry.options) |
| 407 | else: |
| 408 | return self.send_task(entry.task, entry_args, entry_kwargs, |
| 409 | producer=producer, |
| 410 | **entry.options) |
| 411 | except Exception as exc: # pylint: disable=broad-except |
| 412 | reraise(SchedulingError, SchedulingError( |
| 413 | "Couldn't apply scheduled task {0.name}: {exc}".format( |
| 414 | entry, exc=exc)), sys.exc_info()[2]) |
| 415 | finally: |
| 416 | self._tasks_since_sync += 1 |
| 417 | if self.should_sync(): |
| 418 | self._do_sync() |
| 419 | |
| 420 | def send_task(self, *args, **kwargs): |
| 421 | return self.app.send_task(*args, **kwargs) |
no test coverage detected