MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _execute_ddl

Method _execute_ddl

lib/sqlalchemy/engine/base.py:1523–1574  ·  view source on GitHub ↗

Execute a schema.DDL object.

(
        self,
        ddl: ExecutableDDLElement,
        distilled_parameters: _CoreMultiExecuteParams,
        execution_options: CoreExecuteOptionsParameter,
    )

Source from the content-addressed store, hash-verified

1521 return ret
1522
1523 def _execute_ddl(
1524 self,
1525 ddl: ExecutableDDLElement,
1526 distilled_parameters: _CoreMultiExecuteParams,
1527 execution_options: CoreExecuteOptionsParameter,
1528 ) -> CursorResult[Unpack[TupleAny]]:
1529 """Execute a schema.DDL object."""
1530
1531 exec_opts = ddl._execution_options.merge_with(
1532 self._execution_options, execution_options
1533 )
1534
1535 event_multiparams: Optional[_CoreMultiExecuteParams]
1536 event_params: Optional[_CoreSingleExecuteParams]
1537
1538 if self._has_events or self.engine._has_events:
1539 (
1540 ddl,
1541 distilled_parameters,
1542 event_multiparams,
1543 event_params,
1544 ) = self._invoke_before_exec_event(
1545 ddl, distilled_parameters, exec_opts
1546 )
1547 else:
1548 event_multiparams = event_params = None
1549
1550 schema_translate_map = exec_opts.get("schema_translate_map", None)
1551
1552 dialect = self.dialect
1553
1554 compiled = ddl.compile(
1555 dialect=dialect, schema_translate_map=schema_translate_map
1556 )
1557 ret = self._execute_context(
1558 dialect,
1559 dialect.execution_ctx_cls._init_ddl,
1560 compiled,
1561 None,
1562 exec_opts,
1563 compiled,
1564 )
1565 if self._has_events or self.engine._has_events:
1566 self.dispatch.after_execute(
1567 self,
1568 ddl,
1569 event_multiparams,
1570 event_params,
1571 exec_opts,
1572 ret,
1573 )
1574 return ret
1575
1576 def _invoke_before_exec_event(
1577 self,

Callers 1

Calls 6

_execute_contextMethod · 0.95
merge_withMethod · 0.45
getMethod · 0.45
compileMethod · 0.45
after_executeMethod · 0.45

Tested by

no test coverage detected