MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _reflect_fk

Method _reflect_fk

lib/sqlalchemy/engine/reflection.py:1727–1814  ·  view source on GitHub ↗
(
        self,
        _reflect_info: _ReflectionInfo,
        table_key: TableKey,
        table: sa_schema.Table,
        cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
        include_columns: Optional[Collection[str]],
        exclude_columns: Collection[str],
        resolve_fks: bool,
        _extend_on: Optional[Set[sa_schema.Table]],
        reflection_options: Dict[str, Any],
    )

Source from the content-addressed store, hash-verified

1725 table.primary_key._reload(pk_cols)
1726
1727 def _reflect_fk(
1728 self,
1729 _reflect_info: _ReflectionInfo,
1730 table_key: TableKey,
1731 table: sa_schema.Table,
1732 cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
1733 include_columns: Optional[Collection[str]],
1734 exclude_columns: Collection[str],
1735 resolve_fks: bool,
1736 _extend_on: Optional[Set[sa_schema.Table]],
1737 reflection_options: Dict[str, Any],
1738 ) -> None:
1739 fkeys = _reflect_info.foreign_keys.get(table_key, [])
1740 for fkey_d in fkeys:
1741 conname = fkey_d["name"]
1742 # look for columns by orig name in cols_by_orig_name,
1743 # but support columns that are in-Python only as fallback
1744 constrained_columns = [
1745 cols_by_orig_name[c].key if c in cols_by_orig_name else c
1746 for c in fkey_d["constrained_columns"]
1747 ]
1748
1749 if (
1750 exclude_columns
1751 and set(constrained_columns).intersection(exclude_columns)
1752 or (
1753 include_columns
1754 and set(constrained_columns).difference(include_columns)
1755 )
1756 ):
1757 continue
1758
1759 referred_schema = fkey_d["referred_schema"]
1760 referred_table = fkey_d["referred_table"]
1761 referred_columns = fkey_d["referred_columns"]
1762 refspec = []
1763 if referred_schema is not None:
1764 if resolve_fks:
1765 sa_schema.Table(
1766 referred_table,
1767 table.metadata,
1768 schema=referred_schema,
1769 autoload_with=self.bind,
1770 _extend_on=_extend_on,
1771 _reflect_info=_reflect_info,
1772 **reflection_options,
1773 )
1774 for column in referred_columns:
1775 refspec.append(
1776 ".".join([referred_schema, referred_table, column])
1777 )
1778 else:
1779 if resolve_fks:
1780 sa_schema.Table(
1781 referred_table,
1782 table.metadata,
1783 autoload_with=self.bind,
1784 schema=sa_schema.BLANK_SCHEMA,

Callers 1

reflect_tableMethod · 0.95

Calls 7

append_constraintMethod · 0.80
getMethod · 0.45
intersectionMethod · 0.45
differenceMethod · 0.45
appendMethod · 0.45
joinMethod · 0.45
warnMethod · 0.45

Tested by

no test coverage detected