MCPcopy
hub / github.com/celery/celery / celery_session_worker

Function celery_session_worker

celery/contrib/pytest.py:85–105  ·  view source on GitHub ↗

Session Fixture: Start worker that lives throughout test suite.

(
    request,  # type: Any
    celery_session_app,  # type: Celery
    celery_includes,  # type: Sequence[str]
    celery_class_tasks,  # type: str
    celery_worker_pool,  # type: Any
    celery_worker_parameters,  # type: Mapping[str, Any]
)

Source from the content-addressed store, hash-verified

83
84@pytest.fixture(scope='session')
85def celery_session_worker(
86 request, # type: Any
87 celery_session_app, # type: Celery
88 celery_includes, # type: Sequence[str]
89 celery_class_tasks, # type: str
90 celery_worker_pool, # type: Any
91 celery_worker_parameters, # type: Mapping[str, Any]
92):
93 # type: (...) -> WorkController
94 """Session Fixture: Start worker that lives throughout test suite."""
95 from .testing import worker
96
97 if not NO_WORKER:
98 for module in celery_includes:
99 celery_session_app.loader.import_task_module(module)
100 for class_task in celery_class_tasks:
101 celery_session_app.register_task(class_task)
102 with worker.start_worker(celery_session_app,
103 pool=celery_worker_pool,
104 **celery_worker_parameters) as w:
105 yield w
106
107
108@pytest.fixture(scope='session')

Callers

nothing calls this directly

Calls 2

import_task_moduleMethod · 0.80
register_taskMethod · 0.80

Tested by

no test coverage detected