(self, cls)
| 3228 | |
| 3229 | @contextmanager |
| 3230 | def _proxy_fixture(self, cls): |
| 3231 | self.table = self.tables.test |
| 3232 | |
| 3233 | class ExcCtx(default.DefaultExecutionContext): |
| 3234 | def post_exec(self): |
| 3235 | if cls is _cursor.CursorFetchStrategy: |
| 3236 | pass |
| 3237 | elif cls is _cursor.BufferedRowCursorFetchStrategy: |
| 3238 | self.cursor_fetch_strategy = cls( |
| 3239 | self.cursor, self.execution_options |
| 3240 | ) |
| 3241 | elif cls is _cursor.FullyBufferedCursorFetchStrategy: |
| 3242 | self.cursor_fetch_strategy = cls( |
| 3243 | self.cursor, |
| 3244 | self.cursor.description, |
| 3245 | self.cursor.fetchall(), |
| 3246 | ) |
| 3247 | else: |
| 3248 | assert False |
| 3249 | |
| 3250 | self.patcher = patch.object( |
| 3251 | self.engine.dialect, "execution_ctx_cls", ExcCtx |
| 3252 | ) |
| 3253 | |
| 3254 | with self.patcher: |
| 3255 | yield |
| 3256 | |
| 3257 | def _test_proxy(self, cls): |
| 3258 | with self._proxy_fixture(cls): |
no test coverage detected