MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_waiters_handled

Method test_waiters_handled

test/engine/test_pool.py:1300–1351  ·  view source on GitHub ↗

test that threads waiting for connections are handled when the pool is replaced.

(self)

Source from the content-addressed store, hash-verified

1298 @testing.requires.threading_with_mock
1299 @testing.requires.timing_intensive
1300 def test_waiters_handled(self):
1301 """test that threads waiting for connections are
1302 handled when the pool is replaced.
1303
1304 """
1305 mutex = threading.Lock()
1306 dbapi = MockDBAPI()
1307
1308 def creator():
1309 with mutex:
1310 return dbapi.connect()
1311
1312 success = []
1313 for timeout in (None, 30):
1314 for max_overflow in (0, -1, 3):
1315 p = pool.QueuePool(
1316 creator=creator,
1317 pool_size=2,
1318 timeout=timeout,
1319 max_overflow=max_overflow,
1320 )
1321
1322 def waiter(p, timeout, max_overflow):
1323 success_key = (timeout, max_overflow)
1324 conn = p.connect()
1325 success.append(success_key)
1326 time.sleep(0.1)
1327 conn.close()
1328
1329 c1 = p.connect() # noqa
1330 c2 = p.connect()
1331
1332 threads = []
1333 for i in range(2):
1334 t = threading.Thread(
1335 target=waiter, args=(p, timeout, max_overflow)
1336 )
1337 t.daemon = True
1338 t.start()
1339 threads.append(t)
1340
1341 # this sleep makes sure that the
1342 # two waiter threads hit upon wait()
1343 # inside the queue, before we invalidate the other
1344 # two conns
1345 time.sleep(0.2)
1346 p._invalidate(c2)
1347
1348 for t in threads:
1349 t.join(join_timeout)
1350
1351 eq_(len(success), 12, "successes: %s" % success)
1352
1353 def test_connrec_invalidated_within_checkout_no_race(self):
1354 """Test that a concurrent ConnectionRecord.invalidate() which

Callers

nothing calls this directly

Calls 7

eq_Function · 0.90
_invalidateMethod · 0.80
MockDBAPIFunction · 0.70
connectMethod · 0.45
startMethod · 0.45
appendMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected