MCPcopy
hub / github.com/celery/celery / start_worker

Function start_worker

celery/contrib/testing/worker.py:100–136  ·  view source on GitHub ↗

Start embedded worker. Yields: celery.app.worker.Worker: worker instance.

(
    app,  # type: Celery
    concurrency=1,  # type: int
    pool='solo',  # type: str
    loglevel=WORKER_LOGLEVEL,  # type: Union[str, int]
    logfile=None,  # type: str
    perform_ping_check=True,  # type: bool
    ping_task_timeout=10.0,  # type: float
    shutdown_timeout=10.0,  # type: float
    **kwargs  # type: Any
)

Source from the content-addressed store, hash-verified

98
99@contextmanager
100def start_worker(
101 app, # type: Celery
102 concurrency=1, # type: int
103 pool='solo', # type: str
104 loglevel=WORKER_LOGLEVEL, # type: Union[str, int]
105 logfile=None, # type: str
106 perform_ping_check=True, # type: bool
107 ping_task_timeout=10.0, # type: float
108 shutdown_timeout=10.0, # type: float
109 **kwargs # type: Any
110):
111 # type: (...) -> Iterable
112 """Start embedded worker.
113
114 Yields:
115 celery.app.worker.Worker: worker instance.
116 """
117 test_worker_starting.send(sender=app)
118
119 worker = None
120 try:
121 with _start_worker_thread(app,
122 concurrency=concurrency,
123 pool=pool,
124 loglevel=loglevel,
125 logfile=logfile,
126 perform_ping_check=perform_ping_check,
127 shutdown_timeout=shutdown_timeout,
128 **kwargs) as worker:
129 if perform_ping_check:
130 from .tasks import ping
131 with allow_join_result():
132 assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
133
134 yield worker
135 finally:
136 test_worker_stopped.send(sender=app, worker=worker)
137
138
139@contextmanager

Calls 5

allow_join_resultFunction · 0.90
_start_worker_threadFunction · 0.85
sendMethod · 0.45
getMethod · 0.45
delayMethod · 0.45