MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _should_wrap_creator

Method _should_wrap_creator

lib/sqlalchemy/pool/base.py:328–359  ·  view source on GitHub ↗

Detect if creator accepts a single argument, or is sent as a legacy style no-arg function.

(
        self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
    )

Source from the content-addressed store, hash-verified

326 del self._invoke_creator
327
328 def _should_wrap_creator(
329 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
330 ) -> _CreatorWRecFnType:
331 """Detect if creator accepts a single argument, or is sent
332 as a legacy style no-arg function.
333
334 """
335
336 try:
337 argspec = util.get_callable_argspec(self._creator, no_self=True)
338 except TypeError:
339 creator_fn = cast(_CreatorFnType, creator)
340 return lambda rec: creator_fn()
341
342 if argspec.defaults is not None:
343 defaulted = len(argspec.defaults)
344 else:
345 defaulted = 0
346 positionals = len(argspec[0]) - defaulted
347
348 # look for the exact arg signature that DefaultStrategy
349 # sends us
350 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
351 return cast(_CreatorWRecFnType, creator)
352 # or just a single positional
353 elif positionals == 1:
354 return cast(_CreatorWRecFnType, creator)
355 # all other cases, just wrap and assume legacy "creator" callable
356 # thing
357 else:
358 creator_fn = cast(_CreatorFnType, creator)
359 return lambda rec: creator_fn()
360
361 def _close_connection(
362 self, connection: DBAPIConnection, *, terminate: bool = False

Callers 1

_creatorMethod · 0.95

Calls 1

castFunction · 0.50

Tested by

no test coverage detected