Bootstep syncing state with neighbor workers. At startup, or upon consumer restart, this will: - Sync logical clocks. - Sync revoked tasks.
| 11 | |
| 12 | |
| 13 | class Mingle(bootsteps.StartStopStep): |
| 14 | """Bootstep syncing state with neighbor workers. |
| 15 | |
| 16 | At startup, or upon consumer restart, this will: |
| 17 | |
| 18 | - Sync logical clocks. |
| 19 | - Sync revoked tasks. |
| 20 | |
| 21 | """ |
| 22 | |
| 23 | label = 'Mingle' |
| 24 | requires = (Events,) |
| 25 | compatible_transports = {'amqp', 'redis', 'gcpubsub'} |
| 26 | |
| 27 | def __init__(self, c, without_mingle=False, **kwargs): |
| 28 | self.enabled = not without_mingle and self.compatible_transport(c.app) |
| 29 | super().__init__( |
| 30 | c, without_mingle=without_mingle, **kwargs) |
| 31 | |
| 32 | def compatible_transport(self, app): |
| 33 | with app.connection_for_read() as conn: |
| 34 | return conn.transport.driver_type in self.compatible_transports |
| 35 | |
| 36 | def start(self, c): |
| 37 | self.sync(c) |
| 38 | |
| 39 | def sync(self, c): |
| 40 | info('mingle: searching for neighbors') |
| 41 | replies = self.send_hello(c) |
| 42 | if replies: |
| 43 | info('mingle: sync with %s nodes', |
| 44 | len([reply for reply, value in replies.items() if value])) |
| 45 | [self.on_node_reply(c, nodename, reply) |
| 46 | for nodename, reply in replies.items() if reply] |
| 47 | info('mingle: sync complete') |
| 48 | else: |
| 49 | info('mingle: all alone') |
| 50 | |
| 51 | def send_hello(self, c): |
| 52 | inspect = c.app.control.inspect(timeout=1.0, connection=c.connection) |
| 53 | our_revoked = c.controller.state.revoked |
| 54 | replies = inspect.hello(c.hostname, our_revoked._data) or {} |
| 55 | replies.pop(c.hostname, None) # delete my own response |
| 56 | return replies |
| 57 | |
| 58 | def on_node_reply(self, c, nodename, reply): |
| 59 | debug('mingle: processing reply from %s', nodename) |
| 60 | try: |
| 61 | self.sync_with_node(c, **reply) |
| 62 | except MemoryError: |
| 63 | raise |
| 64 | except Exception as exc: # pylint: disable=broad-except |
| 65 | exception('mingle: sync with %s failed: %r', nodename, exc) |
| 66 | |
| 67 | def sync_with_node(self, c, clock=None, revoked=None, **kwargs): |
| 68 | self.on_clock_event(c, clock) |
| 69 | self.on_revoked_received(c, revoked) |
| 70 |
no outgoing calls