(self, _is_asyncio=False, _has_terminate=False)
| 470 | return p, canary |
| 471 | |
| 472 | def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False): |
| 473 | p = self._queuepool_fixture( |
| 474 | _is_asyncio=_is_asyncio, _has_terminate=_has_terminate |
| 475 | ) |
| 476 | canary = [] |
| 477 | |
| 478 | @event.listens_for(p, "reset") |
| 479 | def reset(conn, rec, state): |
| 480 | canary.append( |
| 481 | "reset_" |
| 482 | f"{'rollback_ok' if state.asyncio_safe else 'no_rollback'}" |
| 483 | ) |
| 484 | |
| 485 | @event.listens_for(p, "checkin") |
| 486 | def checkin(*arg, **kw): |
| 487 | canary.append("checkin") |
| 488 | |
| 489 | @event.listens_for(p, "close_detached") |
| 490 | def close_detached(*arg, **kw): |
| 491 | canary.append("close_detached") |
| 492 | |
| 493 | @event.listens_for(p, "detach") |
| 494 | def detach(*arg, **kw): |
| 495 | canary.append("detach") |
| 496 | |
| 497 | return p, canary |
| 498 | |
| 499 | def _reset_event_fixture(self): |
| 500 | p = self._queuepool_fixture() |
no test coverage detected