MCPcopy
hub / github.com/celery/celery / run

Method run

celery/utils/timer2.py:79–101  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

77 __next__ = next = _next_entry # for 2to3
78
79 def run(self) -> None:
80 try:
81 self.running = True
82 self.scheduler: Iterator[Tuple[Optional[float], Optional[Entry]]] = iter(self.schedule)
83
84 while not self.__is_shutdown.is_set():
85 delay = self._next_entry()
86 if delay:
87 if self.on_tick:
88 self.on_tick(delay)
89 if sleep is None: # pragma: no cover
90 break
91 sleep(delay)
92 try:
93 self.__is_stopped.set()
94 except TypeError: # pragma: no cover
95 # we lost the race at interpreter shutdown,
96 # so gc collected built-in modules.
97 pass
98 except Exception as exc:
99 logger.error('Thread Timer crashed: %r', exc, exc_info=True)
100 sys.stderr.flush()
101 os._exit(1)
102
103 def stop(self) -> None:
104 self.__is_shutdown.set()

Callers 3

test_on_tickMethod · 0.95
test_thread_crashMethod · 0.95
test_gc_race_lostMethod · 0.95

Calls 5

_next_entryMethod · 0.95
is_setMethod · 0.80
setMethod · 0.45
errorMethod · 0.45
flushMethod · 0.45

Tested by 3

test_on_tickMethod · 0.76
test_thread_crashMethod · 0.76
test_gc_race_lostMethod · 0.76