MCPcopy
hub / github.com/redis/redis-py / OSSMaintNotificationsHandler

Class OSSMaintNotificationsHandler

redis/maint_notifications.py:1061–1205  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

1059
1060
1061class OSSMaintNotificationsHandler:
1062 def __init__(
1063 self,
1064 cluster_client: "MaintNotificationsAbstractRedisCluster",
1065 config: MaintNotificationsConfig,
1066 ) -> None:
1067 self.cluster_client = cluster_client
1068 self.config = config
1069 self._processed_notifications = set()
1070 self._in_progress = set()
1071 self._lock = threading.RLock()
1072
1073 def get_handler_for_connection(self):
1074 # Copy all data that should be shared between connections
1075 # but each connection should have its own pool handler
1076 # since each connection can be in a different state
1077 copy = OSSMaintNotificationsHandler(self.cluster_client, self.config)
1078 copy._processed_notifications = self._processed_notifications
1079 copy._in_progress = self._in_progress
1080 copy._lock = self._lock
1081 return copy
1082
1083 def remove_expired_notifications(self):
1084 with self._lock:
1085 for notification in tuple(self._processed_notifications):
1086 if notification.is_expired():
1087 self._processed_notifications.remove(notification)
1088
1089 def handle_notification(self, notification: MaintenanceNotification):
1090 if isinstance(notification, OSSNodeMigratedNotification):
1091 self.handle_oss_maintenance_completed_notification(notification)
1092 else:
1093 logger.error(f"Unhandled notification type: {notification}")
1094
1095 def handle_oss_maintenance_completed_notification(
1096 self, notification: OSSNodeMigratedNotification
1097 ):
1098 self.remove_expired_notifications()
1099
1100 with self._lock:
1101 if (
1102 notification in self._in_progress
1103 or notification in self._processed_notifications
1104 ):
1105 # we are already handling this notification or it has already been processed
1106 # we should skip in_progress notification since when we reinitialize the cluster
1107 # we execute a CLUSTER SLOTS command that can use a different connection
1108 # that has also has the notification and we don't want to
1109 # process the same notification twice
1110 return
1111
1112 if logger.isEnabledFor(logging.DEBUG):
1113 logger.debug(f"Handling SMIGRATED notification: {notification}")
1114 self._in_progress.add(notification)
1115
1116 # Extract the information about the src and destination nodes that are affected
1117 # by the maintenance. nodes_to_slots_mapping structure:
1118 # {

Callers 2

__init__Method · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected