MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _AdhocProxiedConnection

Class _AdhocProxiedConnection

lib/sqlalchemy/pool/base.py:1134–1190  ·  view source on GitHub ↗

provides the :class:`.PoolProxiedConnection` interface for cases where the DBAPI connection is not actually proxied. This is used by the engine internals to pass a consistent :class:`.PoolProxiedConnection` object to consuming dialects in response to pool events that may not always

Source from the content-addressed store, hash-verified

1132
1133
1134class _AdhocProxiedConnection(PoolProxiedConnection):
1135 """provides the :class:`.PoolProxiedConnection` interface for cases where
1136 the DBAPI connection is not actually proxied.
1137
1138 This is used by the engine internals to pass a consistent
1139 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1140 pool events that may not always have the :class:`._ConnectionFairy`
1141 available.
1142
1143 """
1144
1145 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1146
1147 dbapi_connection: DBAPIConnection
1148 _connection_record: ConnectionPoolEntry
1149
1150 def __init__(
1151 self,
1152 dbapi_connection: DBAPIConnection,
1153 connection_record: ConnectionPoolEntry,
1154 ):
1155 self.dbapi_connection = dbapi_connection
1156 self._connection_record = connection_record
1157 self._is_valid = True
1158
1159 @property
1160 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1161 return self._connection_record.driver_connection
1162
1163 @property
1164 def connection(self) -> DBAPIConnection:
1165 return self.dbapi_connection
1166
1167 @property
1168 def is_valid(self) -> bool:
1169 """Implement is_valid state attribute.
1170
1171 for the adhoc proxied connection it's assumed the connection is valid
1172 as there is no "invalidate" routine.
1173
1174 """
1175 return self._is_valid
1176
1177 def invalidate(
1178 self, e: Optional[BaseException] = None, soft: bool = False
1179 ) -> None:
1180 self._is_valid = False
1181
1182 @util.ro_non_memoized_property
1183 def record_info(self) -> Optional[_InfoType]:
1184 return self._connection_record.record_info
1185
1186 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1187 return self.dbapi_connection.cursor(*args, **kwargs)
1188
1189 def __getattr__(self, key: Any) -> Any:
1190 return getattr(self.dbapi_connection, key)
1191

Callers 1

first_connectFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected