| 95 | super().on_after_fork() |
| 96 | |
| 97 | def _reconnect_pubsub(self): |
| 98 | self._pubsub = None |
| 99 | self.backend.client.connection_pool.reset() |
| 100 | # task state might have changed when the connection was down so we |
| 101 | # retrieve meta for all subscribed tasks before going into pubsub mode |
| 102 | if self.subscribed_to: |
| 103 | metas = self.backend.client.mget(self.subscribed_to) |
| 104 | metas = [meta for meta in metas if meta] |
| 105 | for meta in metas: |
| 106 | self.on_state_change(self._decode_result(meta), None) |
| 107 | self._pubsub = self.backend.client.pubsub( |
| 108 | ignore_subscribe_messages=True, |
| 109 | ) |
| 110 | # subscribed_to maybe empty after on_state_change |
| 111 | if self.subscribed_to: |
| 112 | self._pubsub.subscribe(*self.subscribed_to) |
| 113 | else: |
| 114 | self._pubsub.connection = self._pubsub.connection_pool.get_connection() |
| 115 | # even if there is nothing to subscribe, we should not lose the callback after connecting. |
| 116 | # The on_connect callback will re-subscribe to any channels we previously subscribed to. |
| 117 | self._pubsub.connection.register_connect_callback(self._pubsub.on_connect) |
| 118 | |
| 119 | def _reconnect(self): |
| 120 | """Re-establish the Redis pub/sub connection with retry.""" |