MCPcopy
hub / github.com/celery/celery / synloop

Function synloop

celery/worker/loops.py:133–168  ·  view source on GitHub ↗

Fallback blocking event loop for transports that doesn't support AIO.

(obj, connection, consumer, blueprint, hub, qos,
            heartbeat, clock, hbrate=2.0, **kwargs)

Source from the content-addressed store, hash-verified

131
132
133def synloop(obj, connection, consumer, blueprint, hub, qos,
134 heartbeat, clock, hbrate=2.0, **kwargs):
135 """Fallback blocking event loop for transports that doesn't support AIO."""
136 RUN = bootsteps.RUN
137 on_task_received = obj.create_task_handler()
138 perform_pending_operations = obj.perform_pending_operations
139 heartbeat_error = [None]
140 if getattr(obj.pool, 'is_green', False):
141 heartbeat_error = _enable_amqheartbeats(obj.timer, connection, rate=hbrate)
142 consumer.on_message = on_task_received
143 consumer.consume()
144
145 obj.on_ready()
146
147 def _loop_cycle():
148 """
149 Perform one iteration of the blocking event loop.
150 """
151 if heartbeat_error[0] is not None:
152 raise heartbeat_error[0]
153 if qos.prev != qos.value:
154 qos.update()
155 try:
156 perform_pending_operations()
157 connection.drain_events(timeout=2.0)
158 except socket.timeout:
159 pass
160 except OSError:
161 if blueprint.state == RUN:
162 raise
163
164 while blueprint.state == RUN and obj.connection:
165 try:
166 state.maybe_shutdown()
167 finally:
168 _loop_cycle()

Calls 5

_enable_amqheartbeatsFunction · 0.85
_loop_cycleFunction · 0.85
create_task_handlerMethod · 0.80
consumeMethod · 0.80
on_readyMethod · 0.80