MCPcopy
hub / github.com/celery/celery / test_cold_shutdown

Method test_cold_shutdown

t/smoke/tests/test_worker.py:99–113  ·  view source on GitHub ↗
(self, celery_setup: CeleryTestSetup)

Source from the content-addressed store, hash-verified

97 assert res.get(RESULT_TIMEOUT)
98
99 def test_cold_shutdown(self, celery_setup: CeleryTestSetup):
100 queue = celery_setup.worker.worker_queue
101 worker = celery_setup.worker
102 sig = long_running_task.si(5, verbose=True).set(queue=queue)
103 res = sig.delay()
104
105 worker.assert_log_exists("Starting long running task")
106 self.kill_worker(worker, WorkerKill.Method.SIGQUIT)
107 worker.assert_log_exists("worker: Cold shutdown (MainProcess)")
108 worker.assert_log_does_not_exist(f"long_running_task[{res.id}] succeeded", timeout=10)
109
110 assert_container_exited(worker)
111
112 with pytest.raises(celery.exceptions.TimeoutError):
113 res.get(timeout=5)
114
115 def test_hard_shutdown_from_warm(self, celery_setup: CeleryTestSetup):
116 queue = celery_setup.worker.worker_queue

Callers

nothing calls this directly

Calls 7

assert_container_exitedFunction · 0.85
siMethod · 0.80
kill_workerMethod · 0.80
setMethod · 0.45
delayMethod · 0.45
raisesMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected