MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _reflect_indexes

Method _reflect_indexes

lib/sqlalchemy/engine/reflection.py:1823–1887  ·  view source on GitHub ↗
(
        self,
        _reflect_info: _ReflectionInfo,
        table_key: TableKey,
        table: sa_schema.Table,
        cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
        include_columns: Optional[Collection[str]],
        exclude_columns: Collection[str],
        reflection_options: Dict[str, Any],
    )

Source from the content-addressed store, hash-verified

1821 }
1822
1823 def _reflect_indexes(
1824 self,
1825 _reflect_info: _ReflectionInfo,
1826 table_key: TableKey,
1827 table: sa_schema.Table,
1828 cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
1829 include_columns: Optional[Collection[str]],
1830 exclude_columns: Collection[str],
1831 reflection_options: Dict[str, Any],
1832 ) -> None:
1833 # Indexes
1834 indexes = _reflect_info.indexes.get(table_key, [])
1835 for index_d in indexes:
1836 name = index_d["name"]
1837 columns = index_d["column_names"]
1838 expressions = index_d.get("expressions")
1839 column_sorting = index_d.get("column_sorting", {})
1840 unique = index_d["unique"]
1841 flavor = index_d.get("type", "index")
1842 dialect_options = index_d.get("dialect_options", {})
1843
1844 duplicates = index_d.get("duplicates_constraint")
1845 if include_columns and not set(columns).issubset(include_columns):
1846 continue
1847 if duplicates:
1848 continue
1849 # look for columns by orig name in cols_by_orig_name,
1850 # but support columns that are in-Python only as fallback
1851 idx_element: Any
1852 idx_elements = []
1853 for index, c in enumerate(columns):
1854 if c is None:
1855 if not expressions:
1856 util.warn(
1857 f"Skipping {flavor} {name!r} because key "
1858 f"{index + 1} reflected as None but no "
1859 "'expressions' were returned"
1860 )
1861 break
1862 idx_element = sql.text(expressions[index])
1863 else:
1864 try:
1865 if c in cols_by_orig_name:
1866 idx_element = cols_by_orig_name[c]
1867 else:
1868 idx_element = table.c[c]
1869 except KeyError:
1870 util.warn(
1871 f"{flavor} key {c!r} was not located in "
1872 f"columns for table {table.name!r}"
1873 )
1874 continue
1875 for option in column_sorting.get(c, ()):
1876 if option in self._index_sort_exprs:
1877 op = self._index_sort_exprs[option]
1878 idx_element = op(idx_element)
1879 idx_elements.append(idx_element)
1880 else:

Callers 1

reflect_tableMethod · 0.95

Calls 5

opFunction · 0.85
getMethod · 0.45
issubsetMethod · 0.45
warnMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected