Execute a schema.DDL object.
(
self,
ddl: ExecutableDDLElement,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
)
| 1521 | return ret |
| 1522 | |
| 1523 | def _execute_ddl( |
| 1524 | self, |
| 1525 | ddl: ExecutableDDLElement, |
| 1526 | distilled_parameters: _CoreMultiExecuteParams, |
| 1527 | execution_options: CoreExecuteOptionsParameter, |
| 1528 | ) -> CursorResult[Unpack[TupleAny]]: |
| 1529 | """Execute a schema.DDL object.""" |
| 1530 | |
| 1531 | exec_opts = ddl._execution_options.merge_with( |
| 1532 | self._execution_options, execution_options |
| 1533 | ) |
| 1534 | |
| 1535 | event_multiparams: Optional[_CoreMultiExecuteParams] |
| 1536 | event_params: Optional[_CoreSingleExecuteParams] |
| 1537 | |
| 1538 | if self._has_events or self.engine._has_events: |
| 1539 | ( |
| 1540 | ddl, |
| 1541 | distilled_parameters, |
| 1542 | event_multiparams, |
| 1543 | event_params, |
| 1544 | ) = self._invoke_before_exec_event( |
| 1545 | ddl, distilled_parameters, exec_opts |
| 1546 | ) |
| 1547 | else: |
| 1548 | event_multiparams = event_params = None |
| 1549 | |
| 1550 | schema_translate_map = exec_opts.get("schema_translate_map", None) |
| 1551 | |
| 1552 | dialect = self.dialect |
| 1553 | |
| 1554 | compiled = ddl.compile( |
| 1555 | dialect=dialect, schema_translate_map=schema_translate_map |
| 1556 | ) |
| 1557 | ret = self._execute_context( |
| 1558 | dialect, |
| 1559 | dialect.execution_ctx_cls._init_ddl, |
| 1560 | compiled, |
| 1561 | None, |
| 1562 | exec_opts, |
| 1563 | compiled, |
| 1564 | ) |
| 1565 | if self._has_events or self.engine._has_events: |
| 1566 | self.dispatch.after_execute( |
| 1567 | self, |
| 1568 | ddl, |
| 1569 | event_multiparams, |
| 1570 | event_params, |
| 1571 | exec_opts, |
| 1572 | ret, |
| 1573 | ) |
| 1574 | return ret |
| 1575 | |
| 1576 | def _invoke_before_exec_event( |
| 1577 | self, |
no test coverage detected