()
| 137 | def _allow_stale_update_impl(cfg): |
| 138 | @contextlib.contextmanager |
| 139 | def go(): |
| 140 | @event.listens_for(cfg.db, "engine_connect") |
| 141 | def turn_off_snapshot_isolation(conn): |
| 142 | conn.exec_driver_sql("SET innodb_snapshot_isolation = 'OFF'") |
| 143 | conn.rollback() |
| 144 | |
| 145 | try: |
| 146 | yield |
| 147 | finally: |
| 148 | event.remove(cfg.db, "engine_connect", turn_off_snapshot_isolation) |
| 149 | |
| 150 | # dispose the pool; quick way to just have those reset |
| 151 | cfg.db.dispose() |
| 152 | |
| 153 | return go() |
no test coverage detected