| 1059 | |
| 1060 | |
| 1061 | class OSSMaintNotificationsHandler: |
| 1062 | def __init__( |
| 1063 | self, |
| 1064 | cluster_client: "MaintNotificationsAbstractRedisCluster", |
| 1065 | config: MaintNotificationsConfig, |
| 1066 | ) -> None: |
| 1067 | self.cluster_client = cluster_client |
| 1068 | self.config = config |
| 1069 | self._processed_notifications = set() |
| 1070 | self._in_progress = set() |
| 1071 | self._lock = threading.RLock() |
| 1072 | |
| 1073 | def get_handler_for_connection(self): |
| 1074 | # Copy all data that should be shared between connections |
| 1075 | # but each connection should have its own pool handler |
| 1076 | # since each connection can be in a different state |
| 1077 | copy = OSSMaintNotificationsHandler(self.cluster_client, self.config) |
| 1078 | copy._processed_notifications = self._processed_notifications |
| 1079 | copy._in_progress = self._in_progress |
| 1080 | copy._lock = self._lock |
| 1081 | return copy |
| 1082 | |
| 1083 | def remove_expired_notifications(self): |
| 1084 | with self._lock: |
| 1085 | for notification in tuple(self._processed_notifications): |
| 1086 | if notification.is_expired(): |
| 1087 | self._processed_notifications.remove(notification) |
| 1088 | |
| 1089 | def handle_notification(self, notification: MaintenanceNotification): |
| 1090 | if isinstance(notification, OSSNodeMigratedNotification): |
| 1091 | self.handle_oss_maintenance_completed_notification(notification) |
| 1092 | else: |
| 1093 | logger.error(f"Unhandled notification type: {notification}") |
| 1094 | |
| 1095 | def handle_oss_maintenance_completed_notification( |
| 1096 | self, notification: OSSNodeMigratedNotification |
| 1097 | ): |
| 1098 | self.remove_expired_notifications() |
| 1099 | |
| 1100 | with self._lock: |
| 1101 | if ( |
| 1102 | notification in self._in_progress |
| 1103 | or notification in self._processed_notifications |
| 1104 | ): |
| 1105 | # we are already handling this notification or it has already been processed |
| 1106 | # we should skip in_progress notification since when we reinitialize the cluster |
| 1107 | # we execute a CLUSTER SLOTS command that can use a different connection |
| 1108 | # that has also has the notification and we don't want to |
| 1109 | # process the same notification twice |
| 1110 | return |
| 1111 | |
| 1112 | if logger.isEnabledFor(logging.DEBUG): |
| 1113 | logger.debug(f"Handling SMIGRATED notification: {notification}") |
| 1114 | self._in_progress.add(notification) |
| 1115 | |
| 1116 | # Extract the information about the src and destination nodes that are affected |
| 1117 | # by the maintenance. nodes_to_slots_mapping structure: |
| 1118 | # { |
no outgoing calls
no test coverage detected