(self, merge_type, user_address_fixture)
| 1359 | "empty", "persistent", "transient", argnames="merge_type" |
| 1360 | ) |
| 1361 | def test_merge_persistent(self, merge_type, user_address_fixture): |
| 1362 | addresses = self.tables.addresses |
| 1363 | User, Address = user_address_fixture( |
| 1364 | addresses_args={"order_by": addresses.c.email_address} |
| 1365 | ) |
| 1366 | sess = fixture_session(autoflush=False) |
| 1367 | |
| 1368 | a1 = Address(email_address="a1") |
| 1369 | a2 = Address(email_address="a2") |
| 1370 | a3 = Address(email_address="a3") |
| 1371 | u1 = User(name="jack", addresses=[a2, a3]) |
| 1372 | |
| 1373 | if merge_type == "transient": |
| 1374 | # merge transient. no collection iteration is implied by this. |
| 1375 | u1 = sess.merge(u1) |
| 1376 | sess.add(a1) |
| 1377 | else: |
| 1378 | sess.add_all([u1, a1]) |
| 1379 | sess.flush() |
| 1380 | |
| 1381 | if merge_type == "persistent": |
| 1382 | u1 = User(id=u1.id, name="jane", addresses=[a1, a3]) |
| 1383 | |
| 1384 | # for Dynamic, the list is iterated. it's been this way the |
| 1385 | # whole time, which is clearly not very useful for a |
| 1386 | # "collection that's too large to load". however we maintain |
| 1387 | # legacy behavior here |
| 1388 | u1 = sess.merge(u1) |
| 1389 | eq_(attributes.get_history(u1, "addresses"), ([a1], [a3], [a2])) |
| 1390 | |
| 1391 | sess.flush() |
| 1392 | |
| 1393 | if self.lazy == "dynamic": |
| 1394 | stmt = u1.addresses.statement |
| 1395 | else: |
| 1396 | stmt = u1.addresses.select() |
| 1397 | eq_(sess.scalars(stmt).all(), [a1, a3]) |
| 1398 | |
| 1399 | elif merge_type == "empty": |
| 1400 | # merge while omitting the "too large to load" collection |
| 1401 | # works fine. |
| 1402 | u1 = User(id=u1.id, name="jane") |
| 1403 | u1 = sess.merge(u1) |
| 1404 | |
| 1405 | eq_(attributes.get_history(u1, "addresses"), ([], [a2, a3], [])) |
| 1406 | |
| 1407 | sess.flush() |
| 1408 | |
| 1409 | if self.lazy == "dynamic": |
| 1410 | stmt = u1.addresses.statement |
| 1411 | else: |
| 1412 | stmt = u1.addresses.select() |
| 1413 | |
| 1414 | eq_(sess.scalars(stmt).all(), [a2, a3]) |
| 1415 | |
| 1416 | @testing.combinations(True, False, argnames="delete_cascade_configured") |
| 1417 | def test_delete_cascade( |
nothing calls this directly
no test coverage detected