Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override.
(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes,
proto1_to_proto2=proto1_to_proto2)
| 97 | |
| 98 | |
| 99 | def default(task, app, consumer, |
| 100 | info=logger.info, error=logger.error, task_reserved=task_reserved, |
| 101 | to_system_tz=timezone.to_system, bytes=bytes, |
| 102 | proto1_to_proto2=proto1_to_proto2): |
| 103 | """Default task execution strategy. |
| 104 | |
| 105 | Note: |
| 106 | Strategies are here as an optimization, so sadly |
| 107 | it's not very easy to override. |
| 108 | """ |
| 109 | hostname = consumer.hostname |
| 110 | connection_errors = consumer.connection_errors |
| 111 | _does_info = logger.isEnabledFor(logging.INFO) |
| 112 | # task event related |
| 113 | # (optimized to avoid calling request.send_event) |
| 114 | eventer = consumer.event_dispatcher |
| 115 | events = eventer and eventer.enabled |
| 116 | send_event = eventer and eventer.send |
| 117 | task_sends_events = events and task.send_events |
| 118 | |
| 119 | call_at = consumer.timer.call_at |
| 120 | apply_eta_task = consumer.apply_eta_task |
| 121 | rate_limits_enabled = not consumer.disable_rate_limits |
| 122 | get_bucket = consumer.task_buckets.__getitem__ |
| 123 | handle = consumer.on_task_request |
| 124 | limit_task = consumer._limit_task |
| 125 | limit_post_eta = consumer._limit_post_eta |
| 126 | Request = symbol_by_name(task.Request) |
| 127 | Req = create_request_cls(Request, task, consumer.pool, hostname, eventer, |
| 128 | app=app) |
| 129 | |
| 130 | revoked_tasks = consumer.controller.state.revoked |
| 131 | |
| 132 | def task_message_handler(message, body, ack, reject, callbacks, |
| 133 | to_timestamp=to_timestamp): |
| 134 | if body is None and 'args' not in message.payload: |
| 135 | body, headers, decoded, utc = ( |
| 136 | message.body, message.headers, False, app.uses_utc_timezone(), |
| 137 | ) |
| 138 | else: |
| 139 | if 'args' in message.payload: |
| 140 | body, headers, decoded, utc = hybrid_to_proto2(message, |
| 141 | message.payload) |
| 142 | else: |
| 143 | body, headers, decoded, utc = proto1_to_proto2(message, body) |
| 144 | |
| 145 | req = Req( |
| 146 | message, |
| 147 | on_ack=ack, on_reject=reject, app=app, hostname=hostname, |
| 148 | eventer=eventer, task=task, connection_errors=connection_errors, |
| 149 | body=body, headers=headers, decoded=decoded, utc=utc, |
| 150 | ) |
| 151 | if _does_info: |
| 152 | # Similar to `app.trace.info()`, we pass the formatting args as the |
| 153 | # `extra` kwarg for custom log handlers |
| 154 | context = { |
| 155 | 'id': req.id, |
| 156 | 'name': req.name, |
nothing calls this directly
no test coverage detected