(self)
| 1814 | |
| 1815 | @testing.requires.timing_intensive |
| 1816 | def test_recycle_pool_no_race(self): |
| 1817 | def slow_close(): |
| 1818 | slow_closing_connection._slow_close() |
| 1819 | time.sleep(0.5) |
| 1820 | |
| 1821 | slow_closing_connection = Mock() |
| 1822 | slow_closing_connection.connect.return_value.close = slow_close |
| 1823 | |
| 1824 | class Error(Exception): |
| 1825 | pass |
| 1826 | |
| 1827 | dialect = Mock() |
| 1828 | dialect.is_disconnect = lambda *arg, **kw: True |
| 1829 | dialect.dispatch = Mock(handle_error=[]) |
| 1830 | dialect.dbapi.Error = dialect.loaded_dbapi.Error = Error |
| 1831 | |
| 1832 | pools = [] |
| 1833 | |
| 1834 | class TrackQueuePool(pool.QueuePool): |
| 1835 | def __init__(self, *arg, **kw): |
| 1836 | pools.append(self) |
| 1837 | super().__init__(*arg, **kw) |
| 1838 | |
| 1839 | def creator(): |
| 1840 | return slow_closing_connection.connect() |
| 1841 | |
| 1842 | p1 = TrackQueuePool(creator=creator, pool_size=20) |
| 1843 | |
| 1844 | from sqlalchemy import create_engine |
| 1845 | |
| 1846 | eng = create_engine(testing.db.url, pool=p1, _initialize=False) |
| 1847 | eng.dialect = dialect |
| 1848 | |
| 1849 | # 15 total connections |
| 1850 | conns = [eng.connect() for i in range(15)] |
| 1851 | |
| 1852 | # return 8 back to the pool |
| 1853 | for conn in conns[3:10]: |
| 1854 | conn.close() |
| 1855 | |
| 1856 | def attempt(conn): |
| 1857 | time.sleep(random.random()) |
| 1858 | try: |
| 1859 | conn._handle_dbapi_exception( |
| 1860 | Error(), "statement", {}, Mock(), Mock() |
| 1861 | ) |
| 1862 | except tsa.exc.DBAPIError: |
| 1863 | pass |
| 1864 | |
| 1865 | # run an error + invalidate operation on the remaining 7 open |
| 1866 | # connections |
| 1867 | threads = [] |
| 1868 | for conn in conns: |
| 1869 | t = threading.Thread(target=attempt, args=(conn,)) |
| 1870 | t.start() |
| 1871 | threads.append(t) |
| 1872 | |
| 1873 | for t in threads: |
nothing calls this directly
no test coverage detected