Connect manually upon disconnection. If the Redis server is down, this will fail and raise a ConnectionError as desired. After reconnection, the ``on_connect`` callback should have been called by the # connection to resubscribe us to any channels and patterns
(self, conn, command, *args, **kwargs)
| 1175 | await conn.connect() |
| 1176 | |
| 1177 | async def _execute(self, conn, command, *args, **kwargs): |
| 1178 | """ |
| 1179 | Connect manually upon disconnection. If the Redis server is down, |
| 1180 | this will fail and raise a ConnectionError as desired. |
| 1181 | After reconnection, the ``on_connect`` callback should have been |
| 1182 | called by the # connection to resubscribe us to any channels and |
| 1183 | patterns we were previously listening to |
| 1184 | """ |
| 1185 | if not len(args) == 0: |
| 1186 | command_name = args[0] |
| 1187 | else: |
| 1188 | command_name = None |
| 1189 | |
| 1190 | # Start timing for observability |
| 1191 | start_time = time.monotonic() |
| 1192 | # Track actual retry attempts for error reporting |
| 1193 | actual_retry_attempts = 0 |
| 1194 | |
| 1195 | def failure_callback(error, failure_count): |
| 1196 | nonlocal actual_retry_attempts |
| 1197 | actual_retry_attempts = failure_count |
| 1198 | return self._reconnect(conn, error, failure_count, start_time, command_name) |
| 1199 | |
| 1200 | try: |
| 1201 | response = await conn.retry.call_with_retry( |
| 1202 | lambda: command(*args, **kwargs), |
| 1203 | failure_callback, |
| 1204 | with_failure_count=True, |
| 1205 | ) |
| 1206 | |
| 1207 | if command_name: |
| 1208 | await record_operation_duration( |
| 1209 | command_name=command_name, |
| 1210 | duration_seconds=time.monotonic() - start_time, |
| 1211 | server_address=getattr(conn, "host", None), |
| 1212 | server_port=getattr(conn, "port", None), |
| 1213 | db_namespace=str(conn.db), |
| 1214 | ) |
| 1215 | |
| 1216 | return response |
| 1217 | except Exception as e: |
| 1218 | await record_error_count( |
| 1219 | server_address=getattr(conn, "host", None), |
| 1220 | server_port=getattr(conn, "port", None), |
| 1221 | network_peer_address=getattr(conn, "host", None), |
| 1222 | network_peer_port=getattr(conn, "port", None), |
| 1223 | error_type=e, |
| 1224 | retry_attempts=actual_retry_attempts, |
| 1225 | is_internal=False, |
| 1226 | ) |
| 1227 | raise |
| 1228 | |
| 1229 | async def parse_response(self, block: bool = True, timeout: float = 0): |
| 1230 | """ |
no test coverage detected