MCPcopy
hub / github.com/celery/celery / default

Function default

celery/worker/strategy.py:99–209  ·  view source on GitHub ↗

Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override.

(task, app, consumer,
            info=logger.info, error=logger.error, task_reserved=task_reserved,
            to_system_tz=timezone.to_system, bytes=bytes,
            proto1_to_proto2=proto1_to_proto2)

Source from the content-addressed store, hash-verified

97
98
99def default(task, app, consumer,
100 info=logger.info, error=logger.error, task_reserved=task_reserved,
101 to_system_tz=timezone.to_system, bytes=bytes,
102 proto1_to_proto2=proto1_to_proto2):
103 """Default task execution strategy.
104
105 Note:
106 Strategies are here as an optimization, so sadly
107 it's not very easy to override.
108 """
109 hostname = consumer.hostname
110 connection_errors = consumer.connection_errors
111 _does_info = logger.isEnabledFor(logging.INFO)
112 # task event related
113 # (optimized to avoid calling request.send_event)
114 eventer = consumer.event_dispatcher
115 events = eventer and eventer.enabled
116 send_event = eventer and eventer.send
117 task_sends_events = events and task.send_events
118
119 call_at = consumer.timer.call_at
120 apply_eta_task = consumer.apply_eta_task
121 rate_limits_enabled = not consumer.disable_rate_limits
122 get_bucket = consumer.task_buckets.__getitem__
123 handle = consumer.on_task_request
124 limit_task = consumer._limit_task
125 limit_post_eta = consumer._limit_post_eta
126 Request = symbol_by_name(task.Request)
127 Req = create_request_cls(Request, task, consumer.pool, hostname, eventer,
128 app=app)
129
130 revoked_tasks = consumer.controller.state.revoked
131
132 def task_message_handler(message, body, ack, reject, callbacks,
133 to_timestamp=to_timestamp):
134 if body is None and 'args' not in message.payload:
135 body, headers, decoded, utc = (
136 message.body, message.headers, False, app.uses_utc_timezone(),
137 )
138 else:
139 if 'args' in message.payload:
140 body, headers, decoded, utc = hybrid_to_proto2(message,
141 message.payload)
142 else:
143 body, headers, decoded, utc = proto1_to_proto2(message, body)
144
145 req = Req(
146 message,
147 on_ack=ack, on_reject=reject, app=app, hostname=hostname,
148 eventer=eventer, task=task, connection_errors=connection_errors,
149 body=body, headers=headers, decoded=decoded, utc=utc,
150 )
151 if _does_info:
152 # Similar to `app.trace.info()`, we pass the formatting args as the
153 # `extra` kwarg for custom log handlers
154 context = {
155 'id': req.id,
156 'name': req.name,

Callers

nothing calls this directly

Calls 2

create_request_clsFunction · 0.85
isEnabledForMethod · 0.80

Tested by

no test coverage detected