MCPcopy
hub / github.com/celery/celery / _start_worker_thread

Function _start_worker_thread

celery/contrib/testing/worker.py:140–192  ·  view source on GitHub ↗

Start Celery worker in a thread. Yields: celery.worker.Worker: worker instance.

(app: Celery,
                         concurrency: int = 1,
                         pool: str = 'solo',
                         loglevel: Union[str, int] = WORKER_LOGLEVEL,
                         logfile: Optional[str] = None,
                         WorkController: Any = TestWorkController,
                         perform_ping_check: bool = True,
                         shutdown_timeout: float = 10.0,
                         **kwargs)

Source from the content-addressed store, hash-verified

138
139@contextmanager
140def _start_worker_thread(app: Celery,
141 concurrency: int = 1,
142 pool: str = 'solo',
143 loglevel: Union[str, int] = WORKER_LOGLEVEL,
144 logfile: Optional[str] = None,
145 WorkController: Any = TestWorkController,
146 perform_ping_check: bool = True,
147 shutdown_timeout: float = 10.0,
148 **kwargs) -> Iterable[worker.WorkController]:
149 """Start Celery worker in a thread.
150
151 Yields:
152 celery.worker.Worker: worker instance.
153 """
154 setup_app_for_worker(app, loglevel, logfile)
155 if perform_ping_check:
156 assert 'celery.ping' in app.tasks
157 # Make sure we can connect to the broker
158 with app.connection(hostname=os.environ.get('TEST_BROKER')) as conn:
159 conn.default_channel.queue_declare
160
161 worker = WorkController(
162 app=app,
163 concurrency=concurrency,
164 hostname=kwargs.pop("hostname", anon_nodename()),
165 pool=pool,
166 loglevel=loglevel,
167 logfile=logfile,
168 # not allowed to override TestWorkController.on_consumer_ready
169 ready_callback=None,
170 without_heartbeat=kwargs.pop("without_heartbeat", True),
171 without_mingle=True,
172 without_gossip=True,
173 **kwargs)
174
175 t = threading.Thread(target=worker.start, daemon=True)
176 t.start()
177 worker.ensure_started()
178 _set_task_join_will_block(False)
179
180 try:
181 yield worker
182 finally:
183 from celery.worker import state
184 state.should_terminate = 0
185 t.join(shutdown_timeout)
186 if t.is_alive():
187 raise RuntimeError(
188 "Worker thread failed to exit within the allocated timeout. "
189 "Consider raising `shutdown_timeout` if your tasks take longer "
190 "to execute."
191 )
192 state.should_terminate = None
193
194
195@contextmanager

Callers 1

start_workerFunction · 0.85

Calls 11

anon_nodenameFunction · 0.90
setup_app_for_workerFunction · 0.85
is_aliveMethod · 0.80
WorkControllerClass · 0.50
connectionMethod · 0.45
getMethod · 0.45
popMethod · 0.45
startMethod · 0.45
ensure_startedMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected