(app, celery_session_worker)
| 103 | |
| 104 | @pytest.fixture |
| 105 | def manager(app, celery_session_worker): |
| 106 | manager = Manager(app) |
| 107 | yield manager |
| 108 | try: |
| 109 | manager.wait_until_idle() |
| 110 | except Exception as e: |
| 111 | logger.warning("Failed to stop Celery test manager cleanly: %s", e) |
| 112 | |
| 113 | |
| 114 | @pytest.fixture(autouse=True) |
nothing calls this directly
no test coverage detected