(self)
| 1279 | ) |
| 1280 | |
| 1281 | def test_upsert_from_select(self): |
| 1282 | orders = table( |
| 1283 | "orders", |
| 1284 | column("region"), |
| 1285 | column("amount"), |
| 1286 | column("product"), |
| 1287 | column("quantity"), |
| 1288 | ) |
| 1289 | |
| 1290 | upsert = ( |
| 1291 | orders.update() |
| 1292 | .where(orders.c.region == "Region1") |
| 1293 | .values(amount=1.0, product="Product1", quantity=1) |
| 1294 | .returning(*(orders.c._all_columns)) |
| 1295 | .cte("upsert") |
| 1296 | ) |
| 1297 | |
| 1298 | insert = orders.insert().from_select( |
| 1299 | orders.c.keys(), |
| 1300 | select( |
| 1301 | literal("Region1"), |
| 1302 | literal(1.0), |
| 1303 | literal("Product1"), |
| 1304 | literal(1), |
| 1305 | ).where(~exists(upsert.select())), |
| 1306 | ) |
| 1307 | |
| 1308 | self.assert_compile( |
| 1309 | insert, |
| 1310 | "WITH upsert AS (UPDATE orders SET amount=:param_5, " |
| 1311 | "product=:param_6, quantity=:param_7 " |
| 1312 | "WHERE orders.region = :region_1 " |
| 1313 | "RETURNING orders.region, orders.amount, " |
| 1314 | "orders.product, orders.quantity) " |
| 1315 | "INSERT INTO orders (region, amount, product, quantity) " |
| 1316 | "SELECT :param_1 AS anon_1, :param_2 AS anon_2, " |
| 1317 | ":param_3 AS anon_3, :param_4 AS anon_4 WHERE NOT (EXISTS " |
| 1318 | "(SELECT upsert.region, upsert.amount, upsert.product, " |
| 1319 | "upsert.quantity FROM upsert))", |
| 1320 | checkparams={ |
| 1321 | "param_1": "Region1", |
| 1322 | "param_2": 1.0, |
| 1323 | "param_3": "Product1", |
| 1324 | "param_4": 1, |
| 1325 | "param_5": 1.0, |
| 1326 | "param_6": "Product1", |
| 1327 | "param_7": 1, |
| 1328 | "region_1": "Region1", |
| 1329 | }, |
| 1330 | ) |
| 1331 | |
| 1332 | eq_(insert.compile().isinsert, True) |
| 1333 | |
| 1334 | @testing.combinations( |
| 1335 | ("default_enhanced",), |
nothing calls this directly
no test coverage detected