| 108 | self.dispatcher = c.event_dispatcher |
| 109 | |
| 110 | def on_elect_ack(self, event): |
| 111 | id = event['id'] |
| 112 | try: |
| 113 | replies = self.consensus_replies[id] |
| 114 | except KeyError: |
| 115 | return # not for us |
| 116 | alive_workers = set(self.state.alive_workers()) |
| 117 | replies.append(event['hostname']) |
| 118 | |
| 119 | if len(replies) >= len(alive_workers): |
| 120 | _, leader, topic, action = self.clock.sort_heap( |
| 121 | self.consensus_requests[id], |
| 122 | ) |
| 123 | if leader == self.full_hostname: |
| 124 | info('I won the election %r', id) |
| 125 | try: |
| 126 | handler = self.election_handlers[topic] |
| 127 | except KeyError: |
| 128 | logger.exception('Unknown election topic %r', topic) |
| 129 | else: |
| 130 | handler(action) |
| 131 | else: |
| 132 | info('node %s elected for %r', leader, id) |
| 133 | self.consensus_requests.pop(id, None) |
| 134 | self.consensus_replies.pop(id, None) |
| 135 | |
| 136 | def on_node_join(self, worker): |
| 137 | debug('%s joined the party', worker.hostname) |