| 130 | revoked_tasks = consumer.controller.state.revoked |
| 131 | |
| 132 | def task_message_handler(message, body, ack, reject, callbacks, |
| 133 | to_timestamp=to_timestamp): |
| 134 | if body is None and 'args' not in message.payload: |
| 135 | body, headers, decoded, utc = ( |
| 136 | message.body, message.headers, False, app.uses_utc_timezone(), |
| 137 | ) |
| 138 | else: |
| 139 | if 'args' in message.payload: |
| 140 | body, headers, decoded, utc = hybrid_to_proto2(message, |
| 141 | message.payload) |
| 142 | else: |
| 143 | body, headers, decoded, utc = proto1_to_proto2(message, body) |
| 144 | |
| 145 | req = Req( |
| 146 | message, |
| 147 | on_ack=ack, on_reject=reject, app=app, hostname=hostname, |
| 148 | eventer=eventer, task=task, connection_errors=connection_errors, |
| 149 | body=body, headers=headers, decoded=decoded, utc=utc, |
| 150 | ) |
| 151 | if _does_info: |
| 152 | # Similar to `app.trace.info()`, we pass the formatting args as the |
| 153 | # `extra` kwarg for custom log handlers |
| 154 | context = { |
| 155 | 'id': req.id, |
| 156 | 'name': req.name, |
| 157 | 'args': req.argsrepr, |
| 158 | 'kwargs': req.kwargsrepr, |
| 159 | 'eta': req.eta, |
| 160 | } |
| 161 | info(_app_trace.LOG_RECEIVED, context, extra={'data': context}) |
| 162 | if (req.expires or req.id in revoked_tasks) and req.revoked(): |
| 163 | return |
| 164 | |
| 165 | signals.task_received.send(sender=consumer, request=req) |
| 166 | |
| 167 | if task_sends_events: |
| 168 | send_event( |
| 169 | 'task-received', |
| 170 | uuid=req.id, name=req.name, |
| 171 | args=req.argsrepr, kwargs=req.kwargsrepr, |
| 172 | root_id=req.root_id, parent_id=req.parent_id, |
| 173 | retries=req.request_dict.get('retries', 0), |
| 174 | eta=req.eta and req.eta.isoformat(), |
| 175 | expires=req.expires and req.expires.isoformat(), |
| 176 | ) |
| 177 | |
| 178 | bucket = None |
| 179 | eta = None |
| 180 | if req.eta: |
| 181 | try: |
| 182 | if req.utc: |
| 183 | eta = to_timestamp(to_system_tz(req.eta)) |
| 184 | else: |
| 185 | eta = to_timestamp(req.eta, app.timezone) |
| 186 | except (OverflowError, ValueError) as exc: |
| 187 | error("Couldn't convert ETA %r to timestamp: %r. Task: %r", |
| 188 | req.eta, exc, req.info(safe=True), exc_info=True) |
| 189 | req.reject(requeue=False) |