MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / MockConnection

Class MockConnection

lib/sqlalchemy/engine/mock.py:34–69  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

32
33
34class MockConnection:
35 def __init__(self, dialect: Dialect, execute: Callable[..., Any]):
36 self._dialect = dialect
37 self._execute_impl = execute
38
39 engine: Engine = cast(Any, property(lambda s: s))
40 dialect: Dialect = cast(Any, property(attrgetter("_dialect")))
41 name: str = cast(Any, property(lambda s: s._dialect.name))
42
43 def connect(self, **kwargs: Any) -> MockConnection:
44 return self
45
46 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
47 return obj.schema
48
49 def execution_options(self, **kw: Any) -> MockConnection:
50 return self
51
52 def _run_ddl_visitor(
53 self,
54 visitorcallable: Type[InvokeDDLBase],
55 element: Visitable,
56 **kwargs: Any,
57 ) -> None:
58 kwargs["checkfirst"] = False
59 visitorcallable(
60 dialect=self.dialect, connection=self, **kwargs
61 ).traverse_single(element)
62
63 def execute(
64 self,
65 obj: Executable,
66 parameters: Optional[_CoreAnyExecuteParams] = None,
67 execution_options: Optional[CoreExecuteOptionsParameter] = None,
68 ) -> Any:
69 return self._execute_impl(obj, parameters)
70
71
72def create_mock_engine(

Callers 1

create_mock_engineFunction · 0.85

Calls 1

castFunction · 0.50

Tested by

no test coverage detected