MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _test_cleanup

Method _test_cleanup

test/engine/test_pool.py:2225–2274  ·  view source on GitHub ↗

test that the pool's connections are OK after cleanup() has been called.

(self, strong_refs)

Source from the content-addressed store, hash-verified

2223 # self._test_cleanup(True)
2224
2225 def _test_cleanup(self, strong_refs):
2226 """test that the pool's connections are OK after cleanup() has
2227 been called."""
2228
2229 dbapi = MockDBAPI()
2230
2231 lock = threading.Lock()
2232
2233 def creator():
2234 # the mock iterator isn't threadsafe...
2235 with lock:
2236 return dbapi.connect()
2237
2238 p = pool.SingletonThreadPool(creator=creator, pool_size=3)
2239
2240 if strong_refs:
2241 sr = set()
2242
2243 def _conn():
2244 c = p.connect()
2245 sr.add(c.dbapi_connection)
2246 return c
2247
2248 else:
2249
2250 def _conn():
2251 return p.connect()
2252
2253 def checkout():
2254 for x in range(10):
2255 c = _conn()
2256 assert c
2257 c.cursor()
2258 c.close()
2259 time.sleep(0.01)
2260
2261 threads = []
2262 for i in range(10):
2263 th = threading.Thread(target=checkout)
2264 th.start()
2265 threads.append(th)
2266 for th in threads:
2267 th.join(join_timeout)
2268
2269 lp = len(p._all_conns)
2270 is_true(3 <= lp <= 4)
2271
2272 if strong_refs:
2273 still_opened = len([c for c in sr if not c.close.call_count])
2274 eq_(still_opened, 3)
2275
2276 def test_no_rollback_from_nested_connections(self):
2277 dbapi = MockDBAPI()

Callers 1

test_cleanupMethod · 0.95

Calls 6

is_trueFunction · 0.90
eq_Function · 0.90
MockDBAPIFunction · 0.70
startMethod · 0.45
appendMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected