MCPcopy
hub / github.com/celery/celery / on_message

Method on_message

celery/worker/consumer/gossip.py:183–206  ·  view source on GitHub ↗
(self, prepare, message)

Source from the content-addressed store, hash-verified

181 )]
182
183 def on_message(self, prepare, message):
184 _type = message.delivery_info['routing_key']
185
186 # For redis when `fanout_patterns=False` (See Issue #1882)
187 if _type.split('.', 1)[0] == 'task':
188 return
189 try:
190 handler = self.event_handlers[_type]
191 except KeyError:
192 pass
193 else:
194 return handler(message.payload)
195
196 # proto2: hostname in header; proto1: in body
197 hostname = (message.headers.get('hostname') or
198 message.payload['hostname'])
199 if hostname != self.hostname:
200 try:
201 _, event = prepare(message.payload)
202 self.update_state(event)
203 except (DecodeError, ContentDisallowed, TypeError) as exc:
204 logger.error(exc)
205 else:
206 self.clock.forward()

Callers 2

test_on_message__taskMethod · 0.95
test_on_messageMethod · 0.95

Calls 4

prepareFunction · 0.50
getMethod · 0.45
update_stateMethod · 0.45
errorMethod · 0.45

Tested by 2

test_on_message__taskMethod · 0.76
test_on_messageMethod · 0.76