MCPcopy
hub / github.com/celery/celery / task_message_handler

Function task_message_handler

celery/worker/strategy.py:132–208  ·  view source on GitHub ↗
(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp)

Source from the content-addressed store, hash-verified

130 revoked_tasks = consumer.controller.state.revoked
131
132 def task_message_handler(message, body, ack, reject, callbacks,
133 to_timestamp=to_timestamp):
134 if body is None and 'args' not in message.payload:
135 body, headers, decoded, utc = (
136 message.body, message.headers, False, app.uses_utc_timezone(),
137 )
138 else:
139 if 'args' in message.payload:
140 body, headers, decoded, utc = hybrid_to_proto2(message,
141 message.payload)
142 else:
143 body, headers, decoded, utc = proto1_to_proto2(message, body)
144
145 req = Req(
146 message,
147 on_ack=ack, on_reject=reject, app=app, hostname=hostname,
148 eventer=eventer, task=task, connection_errors=connection_errors,
149 body=body, headers=headers, decoded=decoded, utc=utc,
150 )
151 if _does_info:
152 # Similar to `app.trace.info()`, we pass the formatting args as the
153 # `extra` kwarg for custom log handlers
154 context = {
155 'id': req.id,
156 'name': req.name,
157 'args': req.argsrepr,
158 'kwargs': req.kwargsrepr,
159 'eta': req.eta,
160 }
161 info(_app_trace.LOG_RECEIVED, context, extra={'data': context})
162 if (req.expires or req.id in revoked_tasks) and req.revoked():
163 return
164
165 signals.task_received.send(sender=consumer, request=req)
166
167 if task_sends_events:
168 send_event(
169 'task-received',
170 uuid=req.id, name=req.name,
171 args=req.argsrepr, kwargs=req.kwargsrepr,
172 root_id=req.root_id, parent_id=req.parent_id,
173 retries=req.request_dict.get('retries', 0),
174 eta=req.eta and req.eta.isoformat(),
175 expires=req.expires and req.expires.isoformat(),
176 )
177
178 bucket = None
179 eta = None
180 if req.eta:
181 try:
182 if req.utc:
183 eta = to_timestamp(to_system_tz(req.eta))
184 else:
185 eta = to_timestamp(req.eta, app.timezone)
186 except (OverflowError, ValueError) as exc:
187 error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
188 req.eta, exc, req.info(safe=True), exc_info=True)
189 req.reject(requeue=False)

Calls 11

hybrid_to_proto2Function · 0.85
proto1_to_proto2Function · 0.85
infoFunction · 0.85
task_reservedFunction · 0.85
callbackFunction · 0.85
uses_utc_timezoneMethod · 0.80
rejectMethod · 0.80
revokedMethod · 0.45
sendMethod · 0.45
getMethod · 0.45
infoMethod · 0.45

Tested by 1