MCPcopy
hub / github.com/celery/celery / Mingle

Class Mingle

celery/worker/consumer/mingle.py:13–76  ·  view source on GitHub ↗

Bootstep syncing state with neighbor workers. At startup, or upon consumer restart, this will: - Sync logical clocks. - Sync revoked tasks.

Source from the content-addressed store, hash-verified

11
12
13class Mingle(bootsteps.StartStopStep):
14 """Bootstep syncing state with neighbor workers.
15
16 At startup, or upon consumer restart, this will:
17
18 - Sync logical clocks.
19 - Sync revoked tasks.
20
21 """
22
23 label = 'Mingle'
24 requires = (Events,)
25 compatible_transports = {'amqp', 'redis', 'gcpubsub'}
26
27 def __init__(self, c, without_mingle=False, **kwargs):
28 self.enabled = not without_mingle and self.compatible_transport(c.app)
29 super().__init__(
30 c, without_mingle=without_mingle, **kwargs)
31
32 def compatible_transport(self, app):
33 with app.connection_for_read() as conn:
34 return conn.transport.driver_type in self.compatible_transports
35
36 def start(self, c):
37 self.sync(c)
38
39 def sync(self, c):
40 info('mingle: searching for neighbors')
41 replies = self.send_hello(c)
42 if replies:
43 info('mingle: sync with %s nodes',
44 len([reply for reply, value in replies.items() if value]))
45 [self.on_node_reply(c, nodename, reply)
46 for nodename, reply in replies.items() if reply]
47 info('mingle: sync complete')
48 else:
49 info('mingle: all alone')
50
51 def send_hello(self, c):
52 inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
53 our_revoked = c.controller.state.revoked
54 replies = inspect.hello(c.hostname, our_revoked._data) or {}
55 replies.pop(c.hostname, None) # delete my own response
56 return replies
57
58 def on_node_reply(self, c, nodename, reply):
59 debug('mingle: processing reply from %s', nodename)
60 try:
61 self.sync_with_node(c, **reply)
62 except MemoryError:
63 raise
64 except Exception as exc: # pylint: disable=broad-except
65 exception('mingle: sync with %s failed: %r', nodename, exc)
66
67 def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
68 self.on_clock_event(c, clock)
69 self.on_revoked_received(c, revoked)
70

Callers 2

test_start_no_repliesMethod · 0.90
test_startMethod · 0.90

Calls

no outgoing calls

Tested by 2

test_start_no_repliesMethod · 0.72
test_startMethod · 0.72