MCPcopy
hub / github.com/celery/celery / Tasks

Class Tasks

celery/worker/consumer/tasks.py:20–119  ·  view source on GitHub ↗

Bootstep starting the task message consumer.

Source from the content-addressed store, hash-verified

18
19
20class Tasks(bootsteps.StartStopStep):
21 """Bootstep starting the task message consumer."""
22
23 requires = (Mingle,)
24
25 def __init__(self, c, **kwargs):
26 c.task_consumer = c.qos = None
27 super().__init__(c, **kwargs)
28
29 def start(self, c):
30 """Start task consumer."""
31 c.update_strategies()
32
33 qos_global = self.qos_global(c)
34
35 # set initial prefetch count
36 c.connection.default_channel.basic_qos(
37 0, c.initial_prefetch_count, qos_global,
38 )
39
40 c.task_consumer = c.app.amqp.TaskConsumer(
41 c.connection, on_decode_error=c.on_decode_error,
42 )
43
44 def set_prefetch_count(prefetch_count):
45 return c.task_consumer.qos(
46 prefetch_count=prefetch_count,
47 apply_global=qos_global,
48 )
49 eta_task_limit = c.app.conf.worker_eta_task_limit
50 c.qos = QoS(
51 set_prefetch_count, c.initial_prefetch_count, max_prefetch=eta_task_limit
52 )
53
54 if c.app.conf.worker_disable_prefetch:
55 # Only apply disable-prefetch for Redis brokers
56 is_redis_broker = c.connection.transport.driver_type == 'redis'
57 if not is_redis_broker:
58 logger.warning(
59 f"worker_disable_prefetch is only supported for Redis brokers. "
60 f"Current broker transport: {c.connection.transport.driver_type}. "
61 f"Ignoring disable_prefetch setting."
62 )
63 return
64
65 from types import MethodType
66
67 from celery.worker import state
68 channel_qos = c.task_consumer.channel.qos
69 original_can_consume = channel_qos.can_consume
70
71 def can_consume(self):
72 # Prefer autoscaler's max_concurrency if set; otherwise fall back to pool size
73 limit = getattr(c.controller, "max_concurrency", None) or c.pool.num_processes
74 if len(state.reserved_requests) >= limit:
75 return False
76 return original_can_consume()
77

Calls

no outgoing calls