test that threads waiting for connections are handled when the pool is replaced.
(self)
| 1298 | @testing.requires.threading_with_mock |
| 1299 | @testing.requires.timing_intensive |
| 1300 | def test_waiters_handled(self): |
| 1301 | """test that threads waiting for connections are |
| 1302 | handled when the pool is replaced. |
| 1303 | |
| 1304 | """ |
| 1305 | mutex = threading.Lock() |
| 1306 | dbapi = MockDBAPI() |
| 1307 | |
| 1308 | def creator(): |
| 1309 | with mutex: |
| 1310 | return dbapi.connect() |
| 1311 | |
| 1312 | success = [] |
| 1313 | for timeout in (None, 30): |
| 1314 | for max_overflow in (0, -1, 3): |
| 1315 | p = pool.QueuePool( |
| 1316 | creator=creator, |
| 1317 | pool_size=2, |
| 1318 | timeout=timeout, |
| 1319 | max_overflow=max_overflow, |
| 1320 | ) |
| 1321 | |
| 1322 | def waiter(p, timeout, max_overflow): |
| 1323 | success_key = (timeout, max_overflow) |
| 1324 | conn = p.connect() |
| 1325 | success.append(success_key) |
| 1326 | time.sleep(0.1) |
| 1327 | conn.close() |
| 1328 | |
| 1329 | c1 = p.connect() # noqa |
| 1330 | c2 = p.connect() |
| 1331 | |
| 1332 | threads = [] |
| 1333 | for i in range(2): |
| 1334 | t = threading.Thread( |
| 1335 | target=waiter, args=(p, timeout, max_overflow) |
| 1336 | ) |
| 1337 | t.daemon = True |
| 1338 | t.start() |
| 1339 | threads.append(t) |
| 1340 | |
| 1341 | # this sleep makes sure that the |
| 1342 | # two waiter threads hit upon wait() |
| 1343 | # inside the queue, before we invalidate the other |
| 1344 | # two conns |
| 1345 | time.sleep(0.2) |
| 1346 | p._invalidate(c2) |
| 1347 | |
| 1348 | for t in threads: |
| 1349 | t.join(join_timeout) |
| 1350 | |
| 1351 | eq_(len(success), 12, "successes: %s" % success) |
| 1352 | |
| 1353 | def test_connrec_invalidated_within_checkout_no_race(self): |
| 1354 | """Test that a concurrent ConnectionRecord.invalidate() which |