(
self,
connection: Connection,
table_name: str,
schema: Optional[str] = None,
**kw: Any,
)
| 3501 | |
| 3502 | @reflection.cache |
| 3503 | def get_indexes( |
| 3504 | self, |
| 3505 | connection: Connection, |
| 3506 | table_name: str, |
| 3507 | schema: Optional[str] = None, |
| 3508 | **kw: Any, |
| 3509 | ) -> list[ReflectedIndex]: |
| 3510 | parsed_state = self._parsed_state_or_create( |
| 3511 | connection, table_name, schema, **kw |
| 3512 | ) |
| 3513 | |
| 3514 | indexes: list[ReflectedIndex] = [] |
| 3515 | |
| 3516 | for spec in parsed_state.keys: |
| 3517 | dialect_options = {} |
| 3518 | unique = False |
| 3519 | flavor = spec["type"] |
| 3520 | if flavor == "PRIMARY": |
| 3521 | continue |
| 3522 | if flavor == "UNIQUE": |
| 3523 | unique = True |
| 3524 | elif flavor in ("FULLTEXT", "SPATIAL"): |
| 3525 | dialect_options[f"{self.name}_prefix"] = flavor |
| 3526 | elif flavor is not None: |
| 3527 | util.warn( |
| 3528 | f"Converting unknown KEY type {flavor} to a plain KEY" |
| 3529 | ) |
| 3530 | |
| 3531 | if spec["parser"]: |
| 3532 | dialect_options[f"{self.name}_with_parser"] = spec["parser"] |
| 3533 | |
| 3534 | index_d: ReflectedIndex = { |
| 3535 | "name": spec["name"], |
| 3536 | "column_names": [s[0] for s in spec["columns"]], |
| 3537 | "unique": unique, |
| 3538 | } |
| 3539 | |
| 3540 | mysql_length = { |
| 3541 | s[0]: s[1] for s in spec["columns"] if s[1] is not None |
| 3542 | } |
| 3543 | if mysql_length: |
| 3544 | dialect_options[f"{self.name}_length"] = mysql_length |
| 3545 | |
| 3546 | if dialect_options: |
| 3547 | index_d["dialect_options"] = dialect_options |
| 3548 | |
| 3549 | indexes.append(index_d) |
| 3550 | indexes.sort(key=lambda d: d["name"] or "~") # sort None as last |
| 3551 | return indexes if indexes else ReflectionDefaults.indexes() |
| 3552 | |
| 3553 | @reflection.cache |
| 3554 | def get_unique_constraints( |
nothing calls this directly
no test coverage detected