MCPcopy
hub / github.com/celery/celery / heartbeat_worker

Function heartbeat_worker

t/integration/test_prefork_shutdown.py:27–51  ·  view source on GitHub ↗

Worker with short heartbeat for testing purposes.

(celery_session_app)

Source from the content-addressed store, hash-verified

25
26@pytest.fixture
27def heartbeat_worker(celery_session_app):
28 """Worker with short heartbeat for testing purposes."""
29
30 # Temporarily lower heartbeat for this test
31 original_heartbeat = celery_session_app.conf.broker_heartbeat
32 celery_session_app.conf.broker_heartbeat = TEST_HEARTBEAT
33
34 original_acks_late = celery_session_app.conf.task_acks_late
35 celery_session_app.conf.task_acks_late = True
36
37 with start_worker(
38 celery_session_app,
39 pool="prefork",
40 without_heartbeat=False,
41 concurrency=4,
42 shutdown_timeout=TIMEOUT,
43 perform_ping_check=False,
44 ) as worker:
45 # Verify that low heartbeat is configured correctly
46 assert worker.consumer.amqheartbeat == TEST_HEARTBEAT
47
48 yield worker
49
50 celery_session_app.conf.broker_heartbeat = original_heartbeat
51 celery_session_app.conf.task_acks_late = original_acks_late
52
53
54class test_prefork_shutdown:

Callers

nothing calls this directly

Calls 1

start_workerFunction · 0.90

Tested by

no test coverage detected