MCPcopy
hub / github.com/celery/celery / assert_container_exited

Function assert_container_exited

t/smoke/tests/test_worker.py:15–25  ·  view source on GitHub ↗

It might take a few moments for the container to exit after the worker is killed.

(worker: CeleryTestWorker, attempts: int = RESULT_TIMEOUT)

Source from the content-addressed store, hash-verified

13
14
15def assert_container_exited(worker: CeleryTestWorker, attempts: int = RESULT_TIMEOUT):
16 """It might take a few moments for the container to exit after the worker is killed."""
17 while attempts:
18 worker.container.reload()
19 if worker.container.status == "exited":
20 break
21 attempts -= 1
22 sleep(1)
23
24 worker.container.reload()
25 assert worker.container.status == "exited"
26
27
28@pytest.mark.parametrize("method", list(WorkerRestart.Method))

Calls 1

reloadMethod · 0.80

Tested by

no test coverage detected