(
self,
_reflect_info: _ReflectionInfo,
table_key: TableKey,
table: sa_schema.Table,
cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
include_columns: Optional[Collection[str]],
exclude_columns: Collection[str],
resolve_fks: bool,
_extend_on: Optional[Set[sa_schema.Table]],
reflection_options: Dict[str, Any],
)
| 1725 | table.primary_key._reload(pk_cols) |
| 1726 | |
| 1727 | def _reflect_fk( |
| 1728 | self, |
| 1729 | _reflect_info: _ReflectionInfo, |
| 1730 | table_key: TableKey, |
| 1731 | table: sa_schema.Table, |
| 1732 | cols_by_orig_name: Dict[str, sa_schema.Column[Any]], |
| 1733 | include_columns: Optional[Collection[str]], |
| 1734 | exclude_columns: Collection[str], |
| 1735 | resolve_fks: bool, |
| 1736 | _extend_on: Optional[Set[sa_schema.Table]], |
| 1737 | reflection_options: Dict[str, Any], |
| 1738 | ) -> None: |
| 1739 | fkeys = _reflect_info.foreign_keys.get(table_key, []) |
| 1740 | for fkey_d in fkeys: |
| 1741 | conname = fkey_d["name"] |
| 1742 | # look for columns by orig name in cols_by_orig_name, |
| 1743 | # but support columns that are in-Python only as fallback |
| 1744 | constrained_columns = [ |
| 1745 | cols_by_orig_name[c].key if c in cols_by_orig_name else c |
| 1746 | for c in fkey_d["constrained_columns"] |
| 1747 | ] |
| 1748 | |
| 1749 | if ( |
| 1750 | exclude_columns |
| 1751 | and set(constrained_columns).intersection(exclude_columns) |
| 1752 | or ( |
| 1753 | include_columns |
| 1754 | and set(constrained_columns).difference(include_columns) |
| 1755 | ) |
| 1756 | ): |
| 1757 | continue |
| 1758 | |
| 1759 | referred_schema = fkey_d["referred_schema"] |
| 1760 | referred_table = fkey_d["referred_table"] |
| 1761 | referred_columns = fkey_d["referred_columns"] |
| 1762 | refspec = [] |
| 1763 | if referred_schema is not None: |
| 1764 | if resolve_fks: |
| 1765 | sa_schema.Table( |
| 1766 | referred_table, |
| 1767 | table.metadata, |
| 1768 | schema=referred_schema, |
| 1769 | autoload_with=self.bind, |
| 1770 | _extend_on=_extend_on, |
| 1771 | _reflect_info=_reflect_info, |
| 1772 | **reflection_options, |
| 1773 | ) |
| 1774 | for column in referred_columns: |
| 1775 | refspec.append( |
| 1776 | ".".join([referred_schema, referred_table, column]) |
| 1777 | ) |
| 1778 | else: |
| 1779 | if resolve_fks: |
| 1780 | sa_schema.Table( |
| 1781 | referred_table, |
| 1782 | table.metadata, |
| 1783 | autoload_with=self.bind, |
| 1784 | schema=sa_schema.BLANK_SCHEMA, |
no test coverage detected