| 181 | )] |
| 182 | |
| 183 | def on_message(self, prepare, message): |
| 184 | _type = message.delivery_info['routing_key'] |
| 185 | |
| 186 | # For redis when `fanout_patterns=False` (See Issue #1882) |
| 187 | if _type.split('.', 1)[0] == 'task': |
| 188 | return |
| 189 | try: |
| 190 | handler = self.event_handlers[_type] |
| 191 | except KeyError: |
| 192 | pass |
| 193 | else: |
| 194 | return handler(message.payload) |
| 195 | |
| 196 | # proto2: hostname in header; proto1: in body |
| 197 | hostname = (message.headers.get('hostname') or |
| 198 | message.payload['hostname']) |
| 199 | if hostname != self.hostname: |
| 200 | try: |
| 201 | _, event = prepare(message.payload) |
| 202 | self.update_state(event) |
| 203 | except (DecodeError, ContentDisallowed, TypeError) as exc: |
| 204 | logger.error(exc) |
| 205 | else: |
| 206 | self.clock.forward() |