| 1034 | self._do_testqueuepool(useclose=True) |
| 1035 | |
| 1036 | def _do_testqueuepool(self, useclose=False): |
| 1037 | p = self._queuepool_fixture(pool_size=3, max_overflow=-1) |
| 1038 | |
| 1039 | def status(pool): |
| 1040 | return ( |
| 1041 | pool.size(), |
| 1042 | pool.checkedin(), |
| 1043 | pool.overflow(), |
| 1044 | pool.checkedout(), |
| 1045 | ) |
| 1046 | |
| 1047 | c1 = p.connect() |
| 1048 | self.assert_(status(p) == (3, 0, -2, 1)) |
| 1049 | c2 = p.connect() |
| 1050 | self.assert_(status(p) == (3, 0, -1, 2)) |
| 1051 | c3 = p.connect() |
| 1052 | self.assert_(status(p) == (3, 0, 0, 3)) |
| 1053 | c4 = p.connect() |
| 1054 | self.assert_(status(p) == (3, 0, 1, 4)) |
| 1055 | c5 = p.connect() |
| 1056 | self.assert_(status(p) == (3, 0, 2, 5)) |
| 1057 | c6 = p.connect() |
| 1058 | self.assert_(status(p) == (3, 0, 3, 6)) |
| 1059 | if useclose: |
| 1060 | c4.close() |
| 1061 | c3.close() |
| 1062 | c2.close() |
| 1063 | else: |
| 1064 | c4 = c3 = c2 = None |
| 1065 | lazy_gc() |
| 1066 | eq_(status(p), (3, 3, 3, 3)) |
| 1067 | if useclose: |
| 1068 | c1.close() |
| 1069 | c5.close() |
| 1070 | c6.close() |
| 1071 | else: |
| 1072 | c1 = c5 = c6 = None |
| 1073 | lazy_gc() |
| 1074 | self.assert_(status(p) == (3, 3, 0, 0)) |
| 1075 | c1 = p.connect() |
| 1076 | c2 = p.connect() |
| 1077 | self.assert_(status(p) == (3, 1, 0, 2), status(p)) |
| 1078 | if useclose: |
| 1079 | c2.close() |
| 1080 | else: |
| 1081 | c2 = None |
| 1082 | lazy_gc() |
| 1083 | self.assert_(status(p) == (3, 2, 0, 1)) |
| 1084 | c1.close() |
| 1085 | |
| 1086 | def test_timeout_accessor(self): |
| 1087 | expected_timeout = 123 |