MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / MockDBAPI

Function MockDBAPI

test/engine/test_reconnect.py:131–166  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

129
130
131def MockDBAPI():
132 connections = []
133 stopped = [False]
134
135 def connect():
136 while True:
137 if stopped[0]:
138 raise MockDisconnect("database is stopped")
139 conn = mock_connection()
140 connections.append(conn)
141 yield conn
142
143 def shutdown(explode="execute", stop=False):
144 stopped[0] = stop
145 for c in connections:
146 c.explode = explode
147
148 def restart():
149 stopped[0] = False
150 connections[:] = []
151
152 def dispose():
153 stopped[0] = False
154 for c in connections:
155 c.explode = None
156 connections[:] = []
157
158 return Mock(
159 connect=Mock(side_effect=connect()),
160 shutdown=Mock(side_effect=shutdown),
161 dispose=Mock(side_effect=dispose),
162 restart=Mock(side_effect=restart),
163 paramstyle="named",
164 connections=connections,
165 Error=MockError,
166 )
167
168
169class PrePingMockTest(fixtures.TestBase):

Callers 3

setup_testMethod · 0.70
setup_testMethod · 0.70
_fixtureMethod · 0.70

Calls 1

connectFunction · 0.70

Tested by

no test coverage detected