Disconnects from the Redis server
(
self,
nowait: bool = False,
error: Optional[Exception] = None,
failure_count: Optional[int] = None,
health_check_failed: bool = False,
)
| 578 | raise ConnectionError("Invalid Database") |
| 579 | |
| 580 | async def disconnect( |
| 581 | self, |
| 582 | nowait: bool = False, |
| 583 | error: Optional[Exception] = None, |
| 584 | failure_count: Optional[int] = None, |
| 585 | health_check_failed: bool = False, |
| 586 | ) -> None: |
| 587 | """Disconnects from the Redis server""" |
| 588 | # On Python 3.13+, asyncio.timeout() raises RuntimeError when called |
| 589 | # outside a running Task (e.g. during GC finalization or event-loop |
| 590 | # callbacks). In that context we fall back to a synchronous close. |
| 591 | # See https://github.com/redis/redis-py/issues/3856 |
| 592 | if asyncio.current_task() is None: |
| 593 | self._parser.on_disconnect() |
| 594 | self.reset_should_reconnect() |
| 595 | self._close() |
| 596 | return |
| 597 | |
| 598 | try: |
| 599 | async with async_timeout(self.socket_connect_timeout): |
| 600 | self._parser.on_disconnect() |
| 601 | # Reset the reconnect flag |
| 602 | self.reset_should_reconnect() |
| 603 | if not self.is_connected: |
| 604 | return |
| 605 | try: |
| 606 | self._writer.close() # type: ignore[union-attr] |
| 607 | # wait for close to finish, except when handling errors and |
| 608 | # forcefully disconnecting. |
| 609 | if not nowait: |
| 610 | await self._writer.wait_closed() # type: ignore[union-attr] |
| 611 | except OSError: |
| 612 | pass |
| 613 | finally: |
| 614 | self._reader = None |
| 615 | self._writer = None |
| 616 | except asyncio.TimeoutError: |
| 617 | raise TimeoutError( |
| 618 | f"Timed out closing connection after {self.socket_connect_timeout}" |
| 619 | ) from None |
| 620 | |
| 621 | if error: |
| 622 | if health_check_failed: |
| 623 | close_reason = CloseReason.HEALTHCHECK_FAILED |
| 624 | else: |
| 625 | close_reason = CloseReason.ERROR |
| 626 | |
| 627 | if failure_count is not None and failure_count > self.retry.get_retries(): |
| 628 | await record_error_count( |
| 629 | server_address=getattr(self, "host", None), |
| 630 | server_port=getattr(self, "port", None), |
| 631 | network_peer_address=getattr(self, "host", None), |
| 632 | network_peer_port=getattr(self, "port", None), |
| 633 | error_type=error, |
| 634 | retry_attempts=failure_count, |
| 635 | ) |
| 636 | |
| 637 | await record_connection_closed( |
no test coverage detected