(self, conn: BaseConnection[Any])
| 140 | raise PoolClosed(f"the pool {self.name!r} is not open yet") |
| 141 | |
| 142 | def _check_pool_putconn(self, conn: BaseConnection[Any]) -> None: |
| 143 | if (pool := getattr(conn, "_pool", None)) is self: |
| 144 | return |
| 145 | |
| 146 | if pool: |
| 147 | msg = f"it comes from pool {pool.name!r}" |
| 148 | else: |
| 149 | msg = "it doesn't come from any pool" |
| 150 | raise ValueError( |
| 151 | f"can't return connection to pool {self.name!r}, {msg}: {conn}" |
| 152 | ) |
| 153 | |
| 154 | def get_stats(self) -> dict[str, int]: |
| 155 | """ |