MCPcopy
hub / github.com/celery/celery / test_start

Method test_start

t/unit/app/test_beat.py:848–872  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

846 assert loads(dumps(s))
847
848 def test_start(self):
849 s, sh = self.get_service()
850 schedule = s.scheduler.schedule
851 assert isinstance(schedule, dict)
852 assert isinstance(s.scheduler, beat.Scheduler)
853 scheduled = list(schedule.keys())
854 for task_name in sh['entries'].keys():
855 assert task_name in scheduled
856
857 s.sync()
858 assert sh.closed
859 assert sh.synced
860 assert s._is_stopped.is_set()
861 s.sync()
862 s.stop(wait=False)
863 assert s._is_shutdown.is_set()
864 s.stop(wait=True)
865 assert s._is_shutdown.is_set()
866
867 p = s.scheduler._store
868 s.scheduler._store = None
869 try:
870 s.scheduler.sync()
871 finally:
872 s.scheduler._store = p
873
874 def test_start_embedded_process(self):
875 s, sh = self.get_service()

Callers

nothing calls this directly

Calls 5

get_serviceMethod · 0.95
keysMethod · 0.80
is_setMethod · 0.80
syncMethod · 0.45
stopMethod · 0.45

Tested by

no test coverage detected