MCPcopy
hub / github.com/celery/celery / asynloop

Function asynloop

celery/worker/loops.py:50–130  ·  view source on GitHub ↗

Non-blocking event loop.

(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0)

Source from the content-addressed store, hash-verified

48
49
50def asynloop(obj, connection, consumer, blueprint, hub, qos,
51 heartbeat, clock, hbrate=2.0):
52 """Non-blocking event loop."""
53 RUN = bootsteps.RUN
54 update_qos = qos.update
55 errors = connection.connection_errors
56
57 on_task_received = obj.create_task_handler()
58
59 heartbeat_error = _enable_amqheartbeats(hub.timer, connection, rate=hbrate)
60
61 consumer.on_message = on_task_received
62 obj.controller.register_with_event_loop(hub)
63 obj.register_with_event_loop(hub)
64 consumer.consume()
65 obj.on_ready()
66
67 # did_start_ok will verify that pool processes were able to start,
68 # but this will only work the first time we start, as
69 # maxtasksperchild will mess up metrics.
70 if not obj.restart_count and not obj.pool.did_start_ok():
71 raise WorkerLostError('Could not start worker processes')
72
73 # consumer.consume() may have prefetched up to our
74 # limit - drain an event so we're in a clean state
75 # prior to starting our event loop.
76 if connection.transport.driver_type == 'amqp':
77 hub.call_soon(_quick_drain, connection)
78
79 # FIXME: Use loop.run_forever
80 # Tried and works, but no time to test properly before release.
81 hub.propagate_errors = errors
82 loop = hub.create_loop()
83
84 try:
85 while blueprint.state == RUN and obj.connection:
86 state.maybe_shutdown()
87 if heartbeat_error[0] is not None:
88 raise heartbeat_error[0]
89
90 # We only update QoS when there's no more messages to read.
91 # This groups together qos calls, and makes sure that remote
92 # control commands will be prioritized over task messages.
93 if qos.prev != qos.value:
94 update_qos()
95
96 try:
97 next(loop)
98 except StopIteration:
99 loop = hub.create_loop()
100 except Exception:
101 # Reset the hub on error (e.g. connection loss) to clean up
102 # stale file descriptors and callbacks from the old connection.
103 # Also clear the timer queue so that stale periodic entries added by
104 # register_with_event_loop (e.g. maybe_restore_messages) do not fire
105 # against the broken connection after reconnect and trigger another
106 # crash before the new connection is fully established.
107 # All hub timers are re-registered during blueprint.start() once this

Calls 9

_enable_amqheartbeatsFunction · 0.85
create_task_handlerMethod · 0.80
consumeMethod · 0.80
on_readyMethod · 0.80
did_start_okMethod · 0.45
call_soonMethod · 0.45
resetMethod · 0.45
clearMethod · 0.45