MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _checkin_event_fixture

Method _checkin_event_fixture

test/engine/test_pool.py:472–497  ·  view source on GitHub ↗
(self, _is_asyncio=False, _has_terminate=False)

Source from the content-addressed store, hash-verified

470 return p, canary
471
472 def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
473 p = self._queuepool_fixture(
474 _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
475 )
476 canary = []
477
478 @event.listens_for(p, "reset")
479 def reset(conn, rec, state):
480 canary.append(
481 "reset_"
482 f"{'rollback_ok' if state.asyncio_safe else 'no_rollback'}"
483 )
484
485 @event.listens_for(p, "checkin")
486 def checkin(*arg, **kw):
487 canary.append("checkin")
488
489 @event.listens_for(p, "close_detached")
490 def close_detached(*arg, **kw):
491 canary.append("close_detached")
492
493 @event.listens_for(p, "detach")
494 def detach(*arg, **kw):
495 canary.append("detach")
496
497 return p, canary
498
499 def _reset_event_fixture(self):
500 p = self._queuepool_fixture()

Calls 1

_queuepool_fixtureMethod · 0.80

Tested by

no test coverage detected