MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _test_overflow

Method _test_overflow

test/engine/test_pool.py:1176–1214  ·  view source on GitHub ↗
(self, thread_count, max_overflow)

Source from the content-addressed store, hash-verified

1174 assert t < 14, "Not all timeouts were < 14 seconds %r" % timeouts
1175
1176 def _test_overflow(self, thread_count, max_overflow):
1177 reaper = testing.engines.ConnectionKiller()
1178
1179 dbapi = MockDBAPI()
1180 mutex = threading.Lock()
1181
1182 def creator():
1183 time.sleep(0.05)
1184 with mutex:
1185 return dbapi.connect()
1186
1187 p = pool.QueuePool(
1188 creator=creator, pool_size=3, timeout=2, max_overflow=max_overflow
1189 )
1190 reaper.add_pool(p)
1191 peaks = []
1192
1193 def whammy():
1194 for i in range(10):
1195 try:
1196 con = p.connect()
1197 time.sleep(0.005)
1198 peaks.append(p.overflow())
1199 con.close()
1200 del con
1201 except tsa.exc.TimeoutError:
1202 pass
1203
1204 threads = []
1205 for i in range(thread_count):
1206 th = threading.Thread(target=whammy)
1207 th.start()
1208 threads.append(th)
1209 for th in threads:
1210 th.join(join_timeout)
1211
1212 self.assert_(max(peaks) <= max_overflow)
1213
1214 reaper.assert_all_closed()
1215
1216 def test_overflow_reset_on_failed_connect(self):
1217 dbapi = Mock()

Callers 2

test_no_overflowMethod · 0.95
test_max_overflowMethod · 0.95

Calls 8

add_poolMethod · 0.95
assert_all_closedMethod · 0.95
maxClass · 0.85
MockDBAPIFunction · 0.70
startMethod · 0.45
appendMethod · 0.45
joinMethod · 0.45
assert_Method · 0.45

Tested by

no test coverage detected