MCPcopy
hub / github.com/celery/celery / _reconnect_pubsub

Method _reconnect_pubsub

celery/backends/redis.py:97–117  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

95 super().on_after_fork()
96
97 def _reconnect_pubsub(self):
98 self._pubsub = None
99 self.backend.client.connection_pool.reset()
100 # task state might have changed when the connection was down so we
101 # retrieve meta for all subscribed tasks before going into pubsub mode
102 if self.subscribed_to:
103 metas = self.backend.client.mget(self.subscribed_to)
104 metas = [meta for meta in metas if meta]
105 for meta in metas:
106 self.on_state_change(self._decode_result(meta), None)
107 self._pubsub = self.backend.client.pubsub(
108 ignore_subscribe_messages=True,
109 )
110 # subscribed_to maybe empty after on_state_change
111 if self.subscribed_to:
112 self._pubsub.subscribe(*self.subscribed_to)
113 else:
114 self._pubsub.connection = self._pubsub.connection_pool.get_connection()
115 # even if there is nothing to subscribe, we should not lose the callback after connecting.
116 # The on_connect callback will re-subscribe to any channels we previously subscribed to.
117 self._pubsub.connection.register_connect_callback(self._pubsub.on_connect)
118
119 def _reconnect(self):
120 """Re-establish the Redis pub/sub connection with retry."""

Calls 4

on_state_changeMethod · 0.95
subscribeMethod · 0.80
resetMethod · 0.45
mgetMethod · 0.45