MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_notify_waiters

Method test_notify_waiters

test/engine/test_pool.py:1394–1430  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1392 @testing.requires.threading_with_mock
1393 @testing.requires.timing_intensive
1394 def test_notify_waiters(self):
1395 dbapi = MockDBAPI()
1396
1397 canary = []
1398
1399 def creator():
1400 canary.append(1)
1401 return dbapi.connect()
1402
1403 p1 = pool.QueuePool(
1404 creator=creator, pool_size=1, timeout=None, max_overflow=0
1405 )
1406
1407 def waiter(p):
1408 conn = p.connect()
1409 canary.append(2)
1410 time.sleep(0.5)
1411 conn.close()
1412
1413 c1 = p1.connect()
1414
1415 threads = []
1416 for i in range(5):
1417 t = threading.Thread(target=waiter, args=(p1,))
1418 t.start()
1419 threads.append(t)
1420 time.sleep(0.5)
1421 eq_(canary, [1])
1422
1423 # this also calls invalidate()
1424 # on c1
1425 p1._invalidate(c1)
1426
1427 for t in threads:
1428 t.join(join_timeout)
1429
1430 eq_(canary, [1, 1, 2, 2, 2, 2, 2])
1431
1432 def test_dispose_closes_pooled(self):
1433 dbapi = MockDBAPI()

Callers

nothing calls this directly

Calls 7

eq_Function · 0.90
_invalidateMethod · 0.80
MockDBAPIFunction · 0.70
connectMethod · 0.45
startMethod · 0.45
appendMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected