MCPcopy
hub / github.com/celery/celery / on_elect_ack

Method on_elect_ack

celery/worker/consumer/gossip.py:110–134  ·  view source on GitHub ↗
(self, event)

Source from the content-addressed store, hash-verified

108 self.dispatcher = c.event_dispatcher
109
110 def on_elect_ack(self, event):
111 id = event['id']
112 try:
113 replies = self.consensus_replies[id]
114 except KeyError:
115 return # not for us
116 alive_workers = set(self.state.alive_workers())
117 replies.append(event['hostname'])
118
119 if len(replies) >= len(alive_workers):
120 _, leader, topic, action = self.clock.sort_heap(
121 self.consensus_requests[id],
122 )
123 if leader == self.full_hostname:
124 info('I won the election %r', id)
125 try:
126 handler = self.election_handlers[topic]
127 except KeyError:
128 logger.exception('Unknown election topic %r', topic)
129 else:
130 handler(action)
131 else:
132 info('node %s elected for %r', leader, id)
133 self.consensus_requests.pop(id, None)
134 self.consensus_replies.pop(id, None)
135
136 def on_node_join(self, worker):
137 debug('%s joined the party', worker.hostname)

Callers 1

setup_electionMethod · 0.80

Calls 3

infoFunction · 0.85
alive_workersMethod · 0.80
popMethod · 0.45

Tested by 1

setup_electionMethod · 0.64