Start Celery worker in a thread. Yields: celery.worker.Worker: worker instance.
(app: Celery,
concurrency: int = 1,
pool: str = 'solo',
loglevel: Union[str, int] = WORKER_LOGLEVEL,
logfile: Optional[str] = None,
WorkController: Any = TestWorkController,
perform_ping_check: bool = True,
shutdown_timeout: float = 10.0,
**kwargs)
| 138 | |
| 139 | @contextmanager |
| 140 | def _start_worker_thread(app: Celery, |
| 141 | concurrency: int = 1, |
| 142 | pool: str = 'solo', |
| 143 | loglevel: Union[str, int] = WORKER_LOGLEVEL, |
| 144 | logfile: Optional[str] = None, |
| 145 | WorkController: Any = TestWorkController, |
| 146 | perform_ping_check: bool = True, |
| 147 | shutdown_timeout: float = 10.0, |
| 148 | **kwargs) -> Iterable[worker.WorkController]: |
| 149 | """Start Celery worker in a thread. |
| 150 | |
| 151 | Yields: |
| 152 | celery.worker.Worker: worker instance. |
| 153 | """ |
| 154 | setup_app_for_worker(app, loglevel, logfile) |
| 155 | if perform_ping_check: |
| 156 | assert 'celery.ping' in app.tasks |
| 157 | # Make sure we can connect to the broker |
| 158 | with app.connection(hostname=os.environ.get('TEST_BROKER')) as conn: |
| 159 | conn.default_channel.queue_declare |
| 160 | |
| 161 | worker = WorkController( |
| 162 | app=app, |
| 163 | concurrency=concurrency, |
| 164 | hostname=kwargs.pop("hostname", anon_nodename()), |
| 165 | pool=pool, |
| 166 | loglevel=loglevel, |
| 167 | logfile=logfile, |
| 168 | # not allowed to override TestWorkController.on_consumer_ready |
| 169 | ready_callback=None, |
| 170 | without_heartbeat=kwargs.pop("without_heartbeat", True), |
| 171 | without_mingle=True, |
| 172 | without_gossip=True, |
| 173 | **kwargs) |
| 174 | |
| 175 | t = threading.Thread(target=worker.start, daemon=True) |
| 176 | t.start() |
| 177 | worker.ensure_started() |
| 178 | _set_task_join_will_block(False) |
| 179 | |
| 180 | try: |
| 181 | yield worker |
| 182 | finally: |
| 183 | from celery.worker import state |
| 184 | state.should_terminate = 0 |
| 185 | t.join(shutdown_timeout) |
| 186 | if t.is_alive(): |
| 187 | raise RuntimeError( |
| 188 | "Worker thread failed to exit within the allocated timeout. " |
| 189 | "Consider raising `shutdown_timeout` if your tasks take longer " |
| 190 | "to execute." |
| 191 | ) |
| 192 | state.should_terminate = None |
| 193 | |
| 194 | |
| 195 | @contextmanager |
no test coverage detected