MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _validate_dialect_kwargs

Method _validate_dialect_kwargs

lib/sqlalchemy/sql/base.py:687–724  ·  view source on GitHub ↗
(self, kwargs: Dict[str, Any])

Source from the content-addressed store, hash-verified

685 return util.PopulateDict(self._kw_reg_for_dialect_cls)
686
687 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
688 # validate remaining kwargs that they all specify DB prefixes
689
690 if not kwargs:
691 return
692
693 for k in kwargs:
694 m = re.match("^(.+?)_(.+)$", k)
695 if not m:
696 raise TypeError(
697 "Additional arguments should be "
698 "named <dialectname>_<argument>, got '%s'" % k
699 )
700 dialect_name, arg_name = m.group(1, 2)
701
702 try:
703 construct_arg_dictionary = self.dialect_options[dialect_name]
704 except exc.NoSuchModuleError:
705 util.warn(
706 "Can't validate argument %r; can't "
707 "locate any SQLAlchemy dialect named %r"
708 % (k, dialect_name)
709 )
710 self.dialect_options[dialect_name] = d = _DialectArgDict()
711 d._defaults.update({"*": None})
712 d._non_defaults[arg_name] = kwargs[k]
713 else:
714 if (
715 "*" not in construct_arg_dictionary
716 and arg_name not in construct_arg_dictionary
717 ):
718 raise exc.ArgumentError(
719 "Argument %r is not accepted by "
720 "dialect %r on behalf of %r"
721 % (k, dialect_name, self.__class__)
722 )
723 else:
724 construct_arg_dictionary[arg_name] = kwargs[k]
725
726
727class CompileState:

Callers 12

_extra_kwargsMethod · 0.80
_extra_kwargsMethod · 0.80
__init__Method · 0.80
__init__Method · 0.80
__init__Method · 0.80
fetchMethod · 0.80
with_dialect_optionsMethod · 0.80
__init__Method · 0.80
__init__Method · 0.80
reflect_tableMethod · 0.80
test_updateMethod · 0.80
goMethod · 0.80

Calls 4

_DialectArgDictClass · 0.85
matchMethod · 0.45
warnMethod · 0.45
updateMethod · 0.45

Tested by 2

test_updateMethod · 0.64
goMethod · 0.64