(
compiler,
stmt,
compile_state,
parameters,
_getattr_col_key,
_column_as_key,
_col_bind_name,
check_columns,
values,
toplevel,
kw,
)
| 635 | |
| 636 | |
| 637 | def _scan_insert_from_select_cols( |
| 638 | compiler, |
| 639 | stmt, |
| 640 | compile_state, |
| 641 | parameters, |
| 642 | _getattr_col_key, |
| 643 | _column_as_key, |
| 644 | _col_bind_name, |
| 645 | check_columns, |
| 646 | values, |
| 647 | toplevel, |
| 648 | kw, |
| 649 | ): |
| 650 | cols = [stmt.table.c[_column_as_key(name)] for name in stmt._select_names] |
| 651 | |
| 652 | assert compiler.stack[-1]["selectable"] is stmt |
| 653 | |
| 654 | compiler.stack[-1]["insert_from_select"] = stmt.select |
| 655 | |
| 656 | add_select_cols: List[_CrudParamElementSQLExpr] = [] |
| 657 | if stmt.include_insert_from_select_defaults: |
| 658 | col_set = set(cols) |
| 659 | for col in stmt.table.columns: |
| 660 | # omit columns that were not in the SELECT statement. |
| 661 | # this will omit columns marked as omit_from_statements naturally, |
| 662 | # as long as that col was not explicit in the SELECT. |
| 663 | # if an omit_from_statements col has a "default" on it, then |
| 664 | # we need to include it, as these defaults should still fire off. |
| 665 | # but, if it has that default and it's the "sentinel" default, |
| 666 | # we don't do sentinel default operations for insert_from_select |
| 667 | # here so we again omit it. |
| 668 | if ( |
| 669 | col not in col_set |
| 670 | and col.default |
| 671 | and not col.default.is_sentinel |
| 672 | ): |
| 673 | cols.append(col) |
| 674 | |
| 675 | for c in cols: |
| 676 | col_key = _getattr_col_key(c) |
| 677 | if col_key in parameters and col_key not in check_columns: |
| 678 | parameters.pop(col_key) |
| 679 | values.append((c, compiler.preparer.format_column(c), None, ())) |
| 680 | else: |
| 681 | _append_param_insert_select_hasdefault( |
| 682 | compiler, stmt, c, add_select_cols, kw |
| 683 | ) |
| 684 | |
| 685 | if add_select_cols: |
| 686 | values.extend(add_select_cols) |
| 687 | ins_from_select = compiler.stack[-1]["insert_from_select"] |
| 688 | if not isinstance(ins_from_select, Select): |
| 689 | raise exc.CompileError( |
| 690 | f"Can't extend statement for INSERT..FROM SELECT to include " |
| 691 | f"additional default-holding column(s) " |
| 692 | f"""{ |
| 693 | ', '.join(repr(key) for _, key, _, _ in add_select_cols) |
| 694 | }. Convert the selectable to a subquery() first, or pass """ |
no test coverage detected