(self, on_task_request,
init_callback=noop, hostname=None,
pool=None, app=None,
timer=None, controller=None, hub=None, amqheartbeat=None,
worker_options=None, disable_rate_limits=False,
initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)
| 183 | self.send_all(parent, 'shutdown') |
| 184 | |
| 185 | def __init__(self, on_task_request, |
| 186 | init_callback=noop, hostname=None, |
| 187 | pool=None, app=None, |
| 188 | timer=None, controller=None, hub=None, amqheartbeat=None, |
| 189 | worker_options=None, disable_rate_limits=False, |
| 190 | initial_prefetch_count=2, prefetch_multiplier=1, **kwargs): |
| 191 | self.app = app |
| 192 | self.controller = controller |
| 193 | self.init_callback = init_callback |
| 194 | self.hostname = hostname or gethostname() |
| 195 | self.pid = os.getpid() |
| 196 | self.pool = pool |
| 197 | self.timer = timer |
| 198 | self.strategies = self.Strategies() |
| 199 | self.conninfo = self.app.connection_for_read() |
| 200 | self.connection_errors = self.conninfo.connection_errors |
| 201 | self.channel_errors = self.conninfo.channel_errors |
| 202 | self._restart_state = restart_state(maxR=5, maxT=1) |
| 203 | |
| 204 | self._does_info = logger.isEnabledFor(logging.INFO) |
| 205 | self._limit_order = 0 |
| 206 | self.on_task_request = on_task_request |
| 207 | self.on_task_message = set() |
| 208 | self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate |
| 209 | self.disable_rate_limits = disable_rate_limits |
| 210 | self.initial_prefetch_count = initial_prefetch_count |
| 211 | self.prefetch_multiplier = prefetch_multiplier |
| 212 | self._maximum_prefetch_restored = True |
| 213 | |
| 214 | # this contains a tokenbucket for each task type by name, used for |
| 215 | # rate limits, or None if rate limits are disabled for that task. |
| 216 | self.task_buckets = defaultdict(lambda: None) |
| 217 | self.reset_rate_limits() |
| 218 | |
| 219 | self.hub = hub |
| 220 | if self.hub or getattr(self.pool, 'is_green', False): |
| 221 | self.amqheartbeat = amqheartbeat |
| 222 | if self.amqheartbeat is None: |
| 223 | self.amqheartbeat = self.app.conf.broker_heartbeat |
| 224 | else: |
| 225 | self.amqheartbeat = 0 |
| 226 | |
| 227 | if not hasattr(self, 'loop'): |
| 228 | self.loop = loops.asynloop if hub else loops.synloop |
| 229 | |
| 230 | if _detect_environment() == 'gevent': |
| 231 | # there's a gevent bug that causes timeouts to not be reset, |
| 232 | # so if the connection timeout is exceeded once, it can NEVER |
| 233 | # connect again. |
| 234 | self.app.conf.broker_connection_timeout = None |
| 235 | |
| 236 | self._pending_operations = [] |
| 237 | |
| 238 | self.steps = [] |
| 239 | self.blueprint = self.Blueprint( |
| 240 | steps=self.app.steps['consumer'], |
| 241 | on_close=self.on_close, |
| 242 | ) |
nothing calls this directly
no test coverage detected