(self, connection, metadata)
| 1212 | Table("is", meta2, autoload_with=connection) |
| 1213 | |
| 1214 | def test_reflect_all(self, connection, metadata): |
| 1215 | names = ["rt_%s" % name for name in ("a", "b", "c", "d", "e")] |
| 1216 | nameset = set(names) |
| 1217 | |
| 1218 | baseline = metadata |
| 1219 | for name in names: |
| 1220 | Table(name, baseline, Column("id", sa.Integer, primary_key=True)) |
| 1221 | baseline.create_all(connection) |
| 1222 | |
| 1223 | m1 = MetaData() |
| 1224 | is_false(m1.tables) |
| 1225 | m1.reflect(connection) |
| 1226 | is_true(nameset.issubset(set(m1.tables.keys()))) |
| 1227 | |
| 1228 | m2 = MetaData() |
| 1229 | m2.reflect(connection, only=["rt_a", "rt_b"]) |
| 1230 | eq_(set(m2.tables.keys()), {"rt_a", "rt_b"}) |
| 1231 | |
| 1232 | m3 = MetaData() |
| 1233 | m3.reflect(connection, only=lambda name, meta: name == "rt_c") |
| 1234 | eq_(set(m3.tables.keys()), {"rt_c"}) |
| 1235 | |
| 1236 | m4 = MetaData() |
| 1237 | |
| 1238 | assert_raises_message( |
| 1239 | sa.exc.InvalidRequestError, |
| 1240 | r"Could not reflect: requested table\(s\) not available in " |
| 1241 | r"Engine\(.*?\): \(rt_f\)", |
| 1242 | m4.reflect, |
| 1243 | connection, |
| 1244 | only=["rt_a", "rt_f"], |
| 1245 | ) |
| 1246 | |
| 1247 | m5 = MetaData() |
| 1248 | m5.reflect(connection, only=[]) |
| 1249 | is_false(m5.tables) |
| 1250 | |
| 1251 | m6 = MetaData() |
| 1252 | m6.reflect(connection, only=lambda n, m: False) |
| 1253 | is_false(m6.tables) |
| 1254 | |
| 1255 | m7 = MetaData() |
| 1256 | m7.reflect(connection) |
| 1257 | is_true(nameset.issubset(set(m7.tables.keys()))) |
| 1258 | |
| 1259 | m8_e1 = MetaData() |
| 1260 | rt_c = Table("rt_c", m8_e1) |
| 1261 | m8_e1.reflect(connection, extend_existing=True) |
| 1262 | eq_(set(m8_e1.tables.keys()), set(names)) |
| 1263 | eq_(rt_c.c.keys(), ["id"]) |
| 1264 | |
| 1265 | m8_e2 = MetaData() |
| 1266 | rt_c = Table("rt_c", m8_e2) |
| 1267 | m8_e2.reflect(connection, extend_existing=True, only=["rt_a", "rt_c"]) |
| 1268 | eq_(set(m8_e2.tables.keys()), {"rt_a", "rt_c"}) |
| 1269 | eq_(rt_c.c.keys(), ["id"]) |
| 1270 | |
| 1271 | baseline.drop_all(connection) |
nothing calls this directly
no test coverage detected