(self, stmt_type, from_stmt)
| 1148 | ) |
| 1149 | @testing.variation("from_stmt", [True, False]) |
| 1150 | def test_update_delete_flags(self, stmt_type, from_stmt): |
| 1151 | User, Address = self.classes("User", "Address") |
| 1152 | |
| 1153 | sess = Session(testing.db, future=True) |
| 1154 | |
| 1155 | canary = self._flag_fixture(sess) |
| 1156 | |
| 1157 | if stmt_type.delete: |
| 1158 | stmt = ( |
| 1159 | delete(User) |
| 1160 | .filter_by(id=18) |
| 1161 | .execution_options(synchronize_session="evaluate") |
| 1162 | ) |
| 1163 | elif stmt_type.update: |
| 1164 | stmt = ( |
| 1165 | update(User) |
| 1166 | .filter_by(id=18) |
| 1167 | .values(name="eighteen") |
| 1168 | .execution_options(synchronize_session="evaluate") |
| 1169 | ) |
| 1170 | elif stmt_type.insert: |
| 1171 | stmt = insert(User).values(name="eighteen") |
| 1172 | else: |
| 1173 | stmt_type.fail() |
| 1174 | |
| 1175 | if from_stmt: |
| 1176 | stmt = select(User).from_statement(stmt.returning(User)) |
| 1177 | |
| 1178 | result = sess.execute(stmt) |
| 1179 | result.close() |
| 1180 | |
| 1181 | eq_( |
| 1182 | canary.mock_calls, |
| 1183 | [ |
| 1184 | call.options( |
| 1185 | bind_mapper=inspect(User), |
| 1186 | all_mappers=[inspect(User)], |
| 1187 | is_select=False, |
| 1188 | is_from_statement=bool(from_stmt), |
| 1189 | is_insert=stmt_type.insert, |
| 1190 | is_update=stmt_type.update, |
| 1191 | is_delete=stmt_type.delete, |
| 1192 | is_orm_statement=True, |
| 1193 | is_relationship_load=False, |
| 1194 | is_column_load=False, |
| 1195 | lazy_loaded_from=None, |
| 1196 | ), |
| 1197 | ], |
| 1198 | ) |
| 1199 | |
| 1200 | def test_chained_events_two(self): |
| 1201 | sess = Session(testing.db, future=True) |
nothing calls this directly
no test coverage detected