MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _invoke_before_exec_event

Method _invoke_before_exec_event

lib/sqlalchemy/engine/base.py:1576–1616  ·  view source on GitHub ↗
(
        self,
        elem: Any,
        distilled_params: _CoreMultiExecuteParams,
        execution_options: _ExecuteOptions,
    )

Source from the content-addressed store, hash-verified

1574 return ret
1575
1576 def _invoke_before_exec_event(
1577 self,
1578 elem: Any,
1579 distilled_params: _CoreMultiExecuteParams,
1580 execution_options: _ExecuteOptions,
1581 ) -> Tuple[
1582 Any,
1583 _CoreMultiExecuteParams,
1584 _CoreMultiExecuteParams,
1585 _CoreSingleExecuteParams,
1586 ]:
1587 event_multiparams: _CoreMultiExecuteParams
1588 event_params: _CoreSingleExecuteParams
1589
1590 if len(distilled_params) == 1:
1591 event_multiparams, event_params = [], distilled_params[0]
1592 else:
1593 event_multiparams, event_params = distilled_params, {}
1594
1595 for fn in self.dispatch.before_execute:
1596 elem, event_multiparams, event_params = fn(
1597 self,
1598 elem,
1599 event_multiparams,
1600 event_params,
1601 execution_options,
1602 )
1603
1604 if event_multiparams:
1605 distilled_params = list(event_multiparams)
1606 if event_params:
1607 raise exc.InvalidRequestError(
1608 "Event handler can't return non-empty multiparams "
1609 "and params at the same time"
1610 )
1611 elif event_params:
1612 distilled_params = [event_params]
1613 else:
1614 distilled_params = []
1615
1616 return elem, distilled_params, event_multiparams, event_params
1617
1618 def _execute_clauseelement(
1619 self,

Callers 3

_execute_defaultMethod · 0.95
_execute_ddlMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected