MCPcopy
hub / github.com/celery/celery / run

Method run

celery/backends/asynchronous.py:133–162  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

131 self._drain_complete_event = self._event()
132
133 def run(self):
134 self._started.set()
135
136 try:
137 while not self._stopped.is_set():
138 try:
139 self.result_consumer.drain_events(timeout=1)
140 self._send_drain_complete_event()
141 except socket.timeout:
142 pass
143 except OSError:
144 # Recoverable connection errors (e.g. broker restart)
145 # are handled inside drain_events via reconnection.
146 # If something still leaks through, we log, back off
147 # briefly, and retry instead of spinning hot.
148 logging.warning(
149 'Drainer: connection error during drain_events, '
150 'will retry on next loop iteration.',
151 exc_info=True,
152 )
153 time.sleep(1)
154 except Exception as e:
155 self._exc = e
156 raise
157 finally:
158 self._send_drain_complete_event()
159 try:
160 self._shutdown.set()
161 except RuntimeError as e:
162 logging.error(f"Failed to set shutdown event: {e}")
163
164 def start(self):
165 self._ensure_not_shut_down()

Callers

nothing calls this directly

Calls 6

is_setMethod · 0.80
setMethod · 0.45
drain_eventsMethod · 0.45
sleepMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected