Session Fixture: Start worker that lives throughout test suite.
(
request, # type: Any
celery_session_app, # type: Celery
celery_includes, # type: Sequence[str]
celery_class_tasks, # type: str
celery_worker_pool, # type: Any
celery_worker_parameters, # type: Mapping[str, Any]
)
| 83 | |
| 84 | @pytest.fixture(scope='session') |
| 85 | def celery_session_worker( |
| 86 | request, # type: Any |
| 87 | celery_session_app, # type: Celery |
| 88 | celery_includes, # type: Sequence[str] |
| 89 | celery_class_tasks, # type: str |
| 90 | celery_worker_pool, # type: Any |
| 91 | celery_worker_parameters, # type: Mapping[str, Any] |
| 92 | ): |
| 93 | # type: (...) -> WorkController |
| 94 | """Session Fixture: Start worker that lives throughout test suite.""" |
| 95 | from .testing import worker |
| 96 | |
| 97 | if not NO_WORKER: |
| 98 | for module in celery_includes: |
| 99 | celery_session_app.loader.import_task_module(module) |
| 100 | for class_task in celery_class_tasks: |
| 101 | celery_session_app.register_task(class_task) |
| 102 | with worker.start_worker(celery_session_app, |
| 103 | pool=celery_worker_pool, |
| 104 | **celery_worker_parameters) as w: |
| 105 | yield w |
| 106 | |
| 107 | |
| 108 | @pytest.fixture(scope='session') |
nothing calls this directly
no test coverage detected