MCPcopy
hub / github.com/celery/celery / remaining_estimate

Method remaining_estimate

celery/schedules.py:828–861  ·  view source on GitHub ↗

Return estimate of next time to run. Returns: ~datetime.timedelta: when the periodic task should run next, or if it shouldn't run today (e.g., the sun does not rise today), returns the time when the next check should take place.

(self, last_run_at: datetime)

Source from the content-addressed store, hash-verified

826 )
827
828 def remaining_estimate(self, last_run_at: datetime) -> timedelta:
829 """Return estimate of next time to run.
830
831 Returns:
832 ~datetime.timedelta: when the periodic task should
833 run next, or if it shouldn't run today (e.g., the sun does
834 not rise today), returns the time when the next check
835 should take place.
836 """
837 last_run_at = self.maybe_make_aware(last_run_at)
838 last_run_at_utc = localize(last_run_at, timezone.utc)
839 self.cal.date = last_run_at_utc
840 try:
841 if self.use_center:
842 next_utc = getattr(self.cal, self.method)(
843 self.ephem.Sun(),
844 start=last_run_at_utc, use_center=self.use_center
845 )
846 else:
847 next_utc = getattr(self.cal, self.method)(
848 self.ephem.Sun(), start=last_run_at_utc
849 )
850
851 except self.ephem.CircumpolarError: # pragma: no cover
852 # Sun won't rise/set today. Check again tomorrow
853 # (specifically, after the next anti-transit).
854 next_utc = (
855 self.cal.next_antitransit(self.ephem.Sun()) +
856 timedelta(minutes=1)
857 )
858 next = self.maybe_make_aware(next_utc.datetime())
859 now = self.maybe_make_aware(self.now())
860 delta = next - now
861 return delta
862
863 def is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:
864 """Return tuple of ``(is_due, next_time_to_run)``.

Callers 7

is_dueMethod · 0.95
next_occurrenceMethod · 0.45

Calls 3

localizeFunction · 0.85
maybe_make_awareMethod · 0.80
nowMethod · 0.45