MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _do_testqueuepool

Method _do_testqueuepool

test/engine/test_pool.py:1036–1084  ·  view source on GitHub ↗
(self, useclose=False)

Source from the content-addressed store, hash-verified

1034 self._do_testqueuepool(useclose=True)
1035
1036 def _do_testqueuepool(self, useclose=False):
1037 p = self._queuepool_fixture(pool_size=3, max_overflow=-1)
1038
1039 def status(pool):
1040 return (
1041 pool.size(),
1042 pool.checkedin(),
1043 pool.overflow(),
1044 pool.checkedout(),
1045 )
1046
1047 c1 = p.connect()
1048 self.assert_(status(p) == (3, 0, -2, 1))
1049 c2 = p.connect()
1050 self.assert_(status(p) == (3, 0, -1, 2))
1051 c3 = p.connect()
1052 self.assert_(status(p) == (3, 0, 0, 3))
1053 c4 = p.connect()
1054 self.assert_(status(p) == (3, 0, 1, 4))
1055 c5 = p.connect()
1056 self.assert_(status(p) == (3, 0, 2, 5))
1057 c6 = p.connect()
1058 self.assert_(status(p) == (3, 0, 3, 6))
1059 if useclose:
1060 c4.close()
1061 c3.close()
1062 c2.close()
1063 else:
1064 c4 = c3 = c2 = None
1065 lazy_gc()
1066 eq_(status(p), (3, 3, 3, 3))
1067 if useclose:
1068 c1.close()
1069 c5.close()
1070 c6.close()
1071 else:
1072 c1 = c5 = c6 = None
1073 lazy_gc()
1074 self.assert_(status(p) == (3, 3, 0, 0))
1075 c1 = p.connect()
1076 c2 = p.connect()
1077 self.assert_(status(p) == (3, 1, 0, 2), status(p))
1078 if useclose:
1079 c2.close()
1080 else:
1081 c2 = None
1082 lazy_gc()
1083 self.assert_(status(p) == (3, 2, 0, 1))
1084 c1.close()
1085
1086 def test_timeout_accessor(self):
1087 expected_timeout = 123

Callers 2

test_queuepool_delMethod · 0.95
test_queuepool_closeMethod · 0.95

Calls 7

lazy_gcFunction · 0.90
eq_Function · 0.90
statusFunction · 0.85
_queuepool_fixtureMethod · 0.80
connectMethod · 0.45
assert_Method · 0.45
closeMethod · 0.45

Tested by

no test coverage detected