Get a connected connection from the pool
(self, command_name=None, *keys, **options)
| 1550 | version="5.3.0", |
| 1551 | ) |
| 1552 | async def get_connection(self, command_name=None, *keys, **options): |
| 1553 | """Get a connected connection from the pool""" |
| 1554 | # Track connection count before to detect if a new connection is created |
| 1555 | async with self._lock: |
| 1556 | connections_before = len(self._available_connections) + len( |
| 1557 | self._in_use_connections |
| 1558 | ) |
| 1559 | start_time_created = time.monotonic() |
| 1560 | connection = self.get_available_connection() |
| 1561 | connections_after = len(self._available_connections) + len( |
| 1562 | self._in_use_connections |
| 1563 | ) |
| 1564 | is_created = connections_after > connections_before |
| 1565 | |
| 1566 | # Record state transition for observability |
| 1567 | # This ensures counters stay balanced if ensure_connection() fails and release() is called |
| 1568 | pool_name = get_pool_name(self) |
| 1569 | if is_created: |
| 1570 | # New connection created and acquired: just USED +1 |
| 1571 | await record_connection_count( |
| 1572 | pool_name=pool_name, |
| 1573 | connection_state=ConnectionState.USED, |
| 1574 | counter=1, |
| 1575 | ) |
| 1576 | else: |
| 1577 | # Existing connection acquired from pool: IDLE -> USED |
| 1578 | await record_connection_count( |
| 1579 | pool_name=pool_name, |
| 1580 | connection_state=ConnectionState.IDLE, |
| 1581 | counter=-1, |
| 1582 | ) |
| 1583 | await record_connection_count( |
| 1584 | pool_name=pool_name, |
| 1585 | connection_state=ConnectionState.USED, |
| 1586 | counter=1, |
| 1587 | ) |
| 1588 | |
| 1589 | # We now perform the connection check outside of the lock. |
| 1590 | try: |
| 1591 | await self.ensure_connection(connection) |
| 1592 | |
| 1593 | if is_created: |
| 1594 | await record_connection_create_time( |
| 1595 | connection_pool=self, |
| 1596 | duration_seconds=time.monotonic() - start_time_created, |
| 1597 | ) |
| 1598 | |
| 1599 | return connection |
| 1600 | except BaseException: |
| 1601 | await self.release(connection) |
| 1602 | raise |
| 1603 | |
| 1604 | def get_available_connection(self): |
| 1605 | """Get a connection from the pool, without making sure it is connected""" |
no test coverage detected