MCPcopy
hub / github.com/celery/celery / dedup_worker

Function dedup_worker

t/integration/test_dedup_chain_dispatch.py:24–50  ·  view source on GitHub ↗

Solo worker with dedup enabled. Temporarily enables ``worker_deduplicate_successful_tasks`` on the session app, starts a solo worker, and restores the original setting on teardown.

(celery_session_app)

Source from the content-addressed store, hash-verified

22
23@pytest.fixture()
24def dedup_worker(celery_session_app):
25 """Solo worker with dedup enabled.
26
27 Temporarily enables ``worker_deduplicate_successful_tasks`` on the
28 session app, starts a solo worker, and restores the original
29 setting on teardown.
30 """
31 if not celery_session_app.backend.persistent:
32 raise pytest.skip('Requires a persistent result backend.')
33
34 orig_dedup = celery_session_app.conf.worker_deduplicate_successful_tasks
35 orig_acks_late = celery_session_app.conf.task_acks_late
36 celery_session_app.conf.worker_deduplicate_successful_tasks = True
37 celery_session_app.conf.task_acks_late = True
38
39 try:
40 with start_worker(
41 celery_session_app,
42 pool='solo',
43 concurrency=1,
44 perform_ping_check=False,
45 shutdown_timeout=TIMEOUT,
46 ) as worker:
47 yield worker
48 finally:
49 celery_session_app.conf.worker_deduplicate_successful_tasks = orig_dedup
50 celery_session_app.conf.task_acks_late = orig_acks_late
51
52
53class test_dedup_chain_dispatch:

Callers

nothing calls this directly

Calls 1

start_workerFunction · 0.90

Tested by

no test coverage detected