Start task consumer.
(self, c)
| 27 | super().__init__(c, **kwargs) |
| 28 | |
| 29 | def start(self, c): |
| 30 | """Start task consumer.""" |
| 31 | c.update_strategies() |
| 32 | |
| 33 | qos_global = self.qos_global(c) |
| 34 | |
| 35 | # set initial prefetch count |
| 36 | c.connection.default_channel.basic_qos( |
| 37 | 0, c.initial_prefetch_count, qos_global, |
| 38 | ) |
| 39 | |
| 40 | c.task_consumer = c.app.amqp.TaskConsumer( |
| 41 | c.connection, on_decode_error=c.on_decode_error, |
| 42 | ) |
| 43 | |
| 44 | def set_prefetch_count(prefetch_count): |
| 45 | return c.task_consumer.qos( |
| 46 | prefetch_count=prefetch_count, |
| 47 | apply_global=qos_global, |
| 48 | ) |
| 49 | eta_task_limit = c.app.conf.worker_eta_task_limit |
| 50 | c.qos = QoS( |
| 51 | set_prefetch_count, c.initial_prefetch_count, max_prefetch=eta_task_limit |
| 52 | ) |
| 53 | |
| 54 | if c.app.conf.worker_disable_prefetch: |
| 55 | # Only apply disable-prefetch for Redis brokers |
| 56 | is_redis_broker = c.connection.transport.driver_type == 'redis' |
| 57 | if not is_redis_broker: |
| 58 | logger.warning( |
| 59 | f"worker_disable_prefetch is only supported for Redis brokers. " |
| 60 | f"Current broker transport: {c.connection.transport.driver_type}. " |
| 61 | f"Ignoring disable_prefetch setting." |
| 62 | ) |
| 63 | return |
| 64 | |
| 65 | from types import MethodType |
| 66 | |
| 67 | from celery.worker import state |
| 68 | channel_qos = c.task_consumer.channel.qos |
| 69 | original_can_consume = channel_qos.can_consume |
| 70 | |
| 71 | def can_consume(self): |
| 72 | # Prefer autoscaler's max_concurrency if set; otherwise fall back to pool size |
| 73 | limit = getattr(c.controller, "max_concurrency", None) or c.pool.num_processes |
| 74 | if len(state.reserved_requests) >= limit: |
| 75 | return False |
| 76 | return original_can_consume() |
| 77 | |
| 78 | channel_qos.can_consume = MethodType(can_consume, channel_qos) |
| 79 | |
| 80 | def stop(self, c): |
| 81 | """Stop task consumer.""" |