(self, celery_setup: CeleryTestSetup)
| 97 | assert res.get(RESULT_TIMEOUT) |
| 98 | |
| 99 | def test_cold_shutdown(self, celery_setup: CeleryTestSetup): |
| 100 | queue = celery_setup.worker.worker_queue |
| 101 | worker = celery_setup.worker |
| 102 | sig = long_running_task.si(5, verbose=True).set(queue=queue) |
| 103 | res = sig.delay() |
| 104 | |
| 105 | worker.assert_log_exists("Starting long running task") |
| 106 | self.kill_worker(worker, WorkerKill.Method.SIGQUIT) |
| 107 | worker.assert_log_exists("worker: Cold shutdown (MainProcess)") |
| 108 | worker.assert_log_does_not_exist(f"long_running_task[{res.id}] succeeded", timeout=10) |
| 109 | |
| 110 | assert_container_exited(worker) |
| 111 | |
| 112 | with pytest.raises(celery.exceptions.TimeoutError): |
| 113 | res.get(timeout=5) |
| 114 | |
| 115 | def test_hard_shutdown_from_warm(self, celery_setup: CeleryTestSetup): |
| 116 | queue = celery_setup.worker.worker_queue |
nothing calls this directly
no test coverage detected