Connect manually upon disconnection. If the Redis server is down, this will fail and raise a ConnectionError as desired. After reconnection, the ``on_connect`` callback should have been called by the # connection to resubscribe us to any channels and patterns
(self, conn, command, *args, **kwargs)
| 1146 | conn.connect() |
| 1147 | |
| 1148 | def _execute(self, conn, command, *args, **kwargs): |
| 1149 | """ |
| 1150 | Connect manually upon disconnection. If the Redis server is down, |
| 1151 | this will fail and raise a ConnectionError as desired. |
| 1152 | After reconnection, the ``on_connect`` callback should have been |
| 1153 | called by the # connection to resubscribe us to any channels and |
| 1154 | patterns we were previously listening to |
| 1155 | """ |
| 1156 | |
| 1157 | if conn.should_reconnect(): |
| 1158 | self._reconnect(conn) |
| 1159 | |
| 1160 | if not len(args) == 0: |
| 1161 | command_name = args[0] |
| 1162 | else: |
| 1163 | command_name = None |
| 1164 | |
| 1165 | # Start timing for observability |
| 1166 | start_time = time.monotonic() |
| 1167 | # Track actual retry attempts for error reporting |
| 1168 | actual_retry_attempts = [0] |
| 1169 | |
| 1170 | def failure_callback(error, failure_count): |
| 1171 | actual_retry_attempts[0] = failure_count |
| 1172 | self._reconnect(conn, error, failure_count, start_time, command_name) |
| 1173 | |
| 1174 | try: |
| 1175 | response = conn.retry.call_with_retry( |
| 1176 | lambda: command(*args, **kwargs), |
| 1177 | failure_callback, |
| 1178 | with_failure_count=True, |
| 1179 | ) |
| 1180 | |
| 1181 | if command_name: |
| 1182 | record_operation_duration( |
| 1183 | command_name=command_name, |
| 1184 | duration_seconds=time.monotonic() - start_time, |
| 1185 | server_address=getattr(conn, "host", None), |
| 1186 | server_port=getattr(conn, "port", None), |
| 1187 | db_namespace=str(conn.db), |
| 1188 | ) |
| 1189 | |
| 1190 | return response |
| 1191 | except Exception as e: |
| 1192 | record_error_count( |
| 1193 | server_address=getattr(conn, "host", None), |
| 1194 | server_port=getattr(conn, "port", None), |
| 1195 | network_peer_address=getattr(conn, "host", None), |
| 1196 | network_peer_port=getattr(conn, "port", None), |
| 1197 | error_type=e, |
| 1198 | retry_attempts=actual_retry_attempts[0], |
| 1199 | is_internal=False, |
| 1200 | ) |
| 1201 | raise |
| 1202 | |
| 1203 | def parse_response(self, block=True, timeout=0): |
| 1204 | """ |