Solo worker with dedup enabled. Temporarily enables ``worker_deduplicate_successful_tasks`` on the session app, starts a solo worker, and restores the original setting on teardown.
(celery_session_app)
| 22 | |
| 23 | @pytest.fixture() |
| 24 | def dedup_worker(celery_session_app): |
| 25 | """Solo worker with dedup enabled. |
| 26 | |
| 27 | Temporarily enables ``worker_deduplicate_successful_tasks`` on the |
| 28 | session app, starts a solo worker, and restores the original |
| 29 | setting on teardown. |
| 30 | """ |
| 31 | if not celery_session_app.backend.persistent: |
| 32 | raise pytest.skip('Requires a persistent result backend.') |
| 33 | |
| 34 | orig_dedup = celery_session_app.conf.worker_deduplicate_successful_tasks |
| 35 | orig_acks_late = celery_session_app.conf.task_acks_late |
| 36 | celery_session_app.conf.worker_deduplicate_successful_tasks = True |
| 37 | celery_session_app.conf.task_acks_late = True |
| 38 | |
| 39 | try: |
| 40 | with start_worker( |
| 41 | celery_session_app, |
| 42 | pool='solo', |
| 43 | concurrency=1, |
| 44 | perform_ping_check=False, |
| 45 | shutdown_timeout=TIMEOUT, |
| 46 | ) as worker: |
| 47 | yield worker |
| 48 | finally: |
| 49 | celery_session_app.conf.worker_deduplicate_successful_tasks = orig_dedup |
| 50 | celery_session_app.conf.task_acks_late = orig_acks_late |
| 51 | |
| 52 | |
| 53 | class test_dedup_chain_dispatch: |
nothing calls this directly
no test coverage detected