MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / get_indexes

Method get_indexes

lib/sqlalchemy/dialects/mysql/base.py:3503–3551  ·  view source on GitHub ↗
(
        self,
        connection: Connection,
        table_name: str,
        schema: Optional[str] = None,
        **kw: Any,
    )

Source from the content-addressed store, hash-verified

3501
3502 @reflection.cache
3503 def get_indexes(
3504 self,
3505 connection: Connection,
3506 table_name: str,
3507 schema: Optional[str] = None,
3508 **kw: Any,
3509 ) -> list[ReflectedIndex]:
3510 parsed_state = self._parsed_state_or_create(
3511 connection, table_name, schema, **kw
3512 )
3513
3514 indexes: list[ReflectedIndex] = []
3515
3516 for spec in parsed_state.keys:
3517 dialect_options = {}
3518 unique = False
3519 flavor = spec["type"]
3520 if flavor == "PRIMARY":
3521 continue
3522 if flavor == "UNIQUE":
3523 unique = True
3524 elif flavor in ("FULLTEXT", "SPATIAL"):
3525 dialect_options[f"{self.name}_prefix"] = flavor
3526 elif flavor is not None:
3527 util.warn(
3528 f"Converting unknown KEY type {flavor} to a plain KEY"
3529 )
3530
3531 if spec["parser"]:
3532 dialect_options[f"{self.name}_with_parser"] = spec["parser"]
3533
3534 index_d: ReflectedIndex = {
3535 "name": spec["name"],
3536 "column_names": [s[0] for s in spec["columns"]],
3537 "unique": unique,
3538 }
3539
3540 mysql_length = {
3541 s[0]: s[1] for s in spec["columns"] if s[1] is not None
3542 }
3543 if mysql_length:
3544 dialect_options[f"{self.name}_length"] = mysql_length
3545
3546 if dialect_options:
3547 index_d["dialect_options"] = dialect_options
3548
3549 indexes.append(index_d)
3550 indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
3551 return indexes if indexes else ReflectionDefaults.indexes()
3552
3553 @reflection.cache
3554 def get_unique_constraints(

Callers

nothing calls this directly

Calls 5

indexesMethod · 0.80
warnMethod · 0.45
appendMethod · 0.45
sortMethod · 0.45

Tested by

no test coverage detected