MCPcopy
hub / github.com/celery/celery / __init__

Method __init__

celery/worker/consumer/consumer.py:185–243  ·  view source on GitHub ↗
(self, on_task_request,
                 init_callback=noop, hostname=None,
                 pool=None, app=None,
                 timer=None, controller=None, hub=None, amqheartbeat=None,
                 worker_options=None, disable_rate_limits=False,
                 initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)

Source from the content-addressed store, hash-verified

183 self.send_all(parent, 'shutdown')
184
185 def __init__(self, on_task_request,
186 init_callback=noop, hostname=None,
187 pool=None, app=None,
188 timer=None, controller=None, hub=None, amqheartbeat=None,
189 worker_options=None, disable_rate_limits=False,
190 initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
191 self.app = app
192 self.controller = controller
193 self.init_callback = init_callback
194 self.hostname = hostname or gethostname()
195 self.pid = os.getpid()
196 self.pool = pool
197 self.timer = timer
198 self.strategies = self.Strategies()
199 self.conninfo = self.app.connection_for_read()
200 self.connection_errors = self.conninfo.connection_errors
201 self.channel_errors = self.conninfo.channel_errors
202 self._restart_state = restart_state(maxR=5, maxT=1)
203
204 self._does_info = logger.isEnabledFor(logging.INFO)
205 self._limit_order = 0
206 self.on_task_request = on_task_request
207 self.on_task_message = set()
208 self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
209 self.disable_rate_limits = disable_rate_limits
210 self.initial_prefetch_count = initial_prefetch_count
211 self.prefetch_multiplier = prefetch_multiplier
212 self._maximum_prefetch_restored = True
213
214 # this contains a tokenbucket for each task type by name, used for
215 # rate limits, or None if rate limits are disabled for that task.
216 self.task_buckets = defaultdict(lambda: None)
217 self.reset_rate_limits()
218
219 self.hub = hub
220 if self.hub or getattr(self.pool, 'is_green', False):
221 self.amqheartbeat = amqheartbeat
222 if self.amqheartbeat is None:
223 self.amqheartbeat = self.app.conf.broker_heartbeat
224 else:
225 self.amqheartbeat = 0
226
227 if not hasattr(self, 'loop'):
228 self.loop = loops.asynloop if hub else loops.synloop
229
230 if _detect_environment() == 'gevent':
231 # there's a gevent bug that causes timeouts to not be reset,
232 # so if the connection timeout is exceeded once, it can NEVER
233 # connect again.
234 self.app.conf.broker_connection_timeout = None
235
236 self._pending_operations = []
237
238 self.steps = []
239 self.blueprint = self.Blueprint(
240 steps=self.app.steps['consumer'],
241 on_close=self.on_close,
242 )

Callers

nothing calls this directly

Calls 4

reset_rate_limitsMethod · 0.95
isEnabledForMethod · 0.80
connection_for_readMethod · 0.45
applyMethod · 0.45

Tested by

no test coverage detected