Non-blocking event loop.
(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0)
| 48 | |
| 49 | |
| 50 | def asynloop(obj, connection, consumer, blueprint, hub, qos, |
| 51 | heartbeat, clock, hbrate=2.0): |
| 52 | """Non-blocking event loop.""" |
| 53 | RUN = bootsteps.RUN |
| 54 | update_qos = qos.update |
| 55 | errors = connection.connection_errors |
| 56 | |
| 57 | on_task_received = obj.create_task_handler() |
| 58 | |
| 59 | heartbeat_error = _enable_amqheartbeats(hub.timer, connection, rate=hbrate) |
| 60 | |
| 61 | consumer.on_message = on_task_received |
| 62 | obj.controller.register_with_event_loop(hub) |
| 63 | obj.register_with_event_loop(hub) |
| 64 | consumer.consume() |
| 65 | obj.on_ready() |
| 66 | |
| 67 | # did_start_ok will verify that pool processes were able to start, |
| 68 | # but this will only work the first time we start, as |
| 69 | # maxtasksperchild will mess up metrics. |
| 70 | if not obj.restart_count and not obj.pool.did_start_ok(): |
| 71 | raise WorkerLostError('Could not start worker processes') |
| 72 | |
| 73 | # consumer.consume() may have prefetched up to our |
| 74 | # limit - drain an event so we're in a clean state |
| 75 | # prior to starting our event loop. |
| 76 | if connection.transport.driver_type == 'amqp': |
| 77 | hub.call_soon(_quick_drain, connection) |
| 78 | |
| 79 | # FIXME: Use loop.run_forever |
| 80 | # Tried and works, but no time to test properly before release. |
| 81 | hub.propagate_errors = errors |
| 82 | loop = hub.create_loop() |
| 83 | |
| 84 | try: |
| 85 | while blueprint.state == RUN and obj.connection: |
| 86 | state.maybe_shutdown() |
| 87 | if heartbeat_error[0] is not None: |
| 88 | raise heartbeat_error[0] |
| 89 | |
| 90 | # We only update QoS when there's no more messages to read. |
| 91 | # This groups together qos calls, and makes sure that remote |
| 92 | # control commands will be prioritized over task messages. |
| 93 | if qos.prev != qos.value: |
| 94 | update_qos() |
| 95 | |
| 96 | try: |
| 97 | next(loop) |
| 98 | except StopIteration: |
| 99 | loop = hub.create_loop() |
| 100 | except Exception: |
| 101 | # Reset the hub on error (e.g. connection loss) to clean up |
| 102 | # stale file descriptors and callbacks from the old connection. |
| 103 | # Also clear the timer queue so that stale periodic entries added by |
| 104 | # register_with_event_loop (e.g. maybe_restore_messages) do not fire |
| 105 | # against the broken connection after reconnect and trigger another |
| 106 | # crash before the new connection is fully established. |
| 107 | # All hub timers are re-registered during blueprint.start() once this |