(
self, delete_cascade_configured, user_address_fixture
)
| 1415 | |
| 1416 | @testing.combinations(True, False, argnames="delete_cascade_configured") |
| 1417 | def test_delete_cascade( |
| 1418 | self, delete_cascade_configured, user_address_fixture |
| 1419 | ): |
| 1420 | addresses = self.tables.addresses |
| 1421 | User, Address = user_address_fixture( |
| 1422 | addresses_args={ |
| 1423 | "order_by": addresses.c.id, |
| 1424 | "backref": "user", |
| 1425 | "cascade": ( |
| 1426 | "save-update" |
| 1427 | if not delete_cascade_configured |
| 1428 | else "all, delete" |
| 1429 | ), |
| 1430 | } |
| 1431 | ) |
| 1432 | |
| 1433 | sess = fixture_session( |
| 1434 | autoflush=True, |
| 1435 | ) |
| 1436 | u = User(name="ed") |
| 1437 | u.addresses.add_all( |
| 1438 | [Address(email_address=letter) for letter in "abcdef"] |
| 1439 | ) |
| 1440 | sess.add(u) |
| 1441 | sess.commit() |
| 1442 | |
| 1443 | from sqlalchemy import case |
| 1444 | |
| 1445 | # the byzantine syntax here is so the query works on MSSQL |
| 1446 | isnull_stmt = select( |
| 1447 | case((addresses.c.user_id == None, True), else_=False), |
| 1448 | func.count("*"), |
| 1449 | ).group_by( |
| 1450 | case((addresses.c.user_id == None, True), else_=False), |
| 1451 | addresses.c.user_id, |
| 1452 | ) |
| 1453 | |
| 1454 | eq_( |
| 1455 | {isnull: count for isnull, count in sess.execute(isnull_stmt)}, |
| 1456 | {False: 6}, |
| 1457 | ) |
| 1458 | |
| 1459 | sess.delete(u) |
| 1460 | |
| 1461 | sess.commit() |
| 1462 | |
| 1463 | if not delete_cascade_configured: |
| 1464 | eq_( |
| 1465 | {isnull: count for isnull, count in sess.execute(isnull_stmt)}, |
| 1466 | {True: 6}, |
| 1467 | ) |
| 1468 | else: |
| 1469 | eq_( |
| 1470 | sess.connection() |
| 1471 | .execute(select(func.count("*")).select_from(addresses)) |
| 1472 | .scalar(), |
| 1473 | 0, |
| 1474 | ) |
nothing calls this directly
no test coverage detected