Handler for (sync) connection in pipeline mode.
| 38 | |
| 39 | |
| 40 | class Pipeline(BasePipeline): |
| 41 | """Handler for (sync) connection in pipeline mode.""" |
| 42 | |
| 43 | __module__ = "psycopg" |
| 44 | _conn: Connection[Any] |
| 45 | |
| 46 | def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None: |
| 47 | super().__init__(conn) |
| 48 | self._lock = _DummyLock() if _no_lock else conn.lock |
| 49 | |
| 50 | def sync(self) -> None: |
| 51 | """Sync the pipeline, send any pending command and receive and process |
| 52 | all available results. |
| 53 | """ |
| 54 | try: |
| 55 | with self._lock: |
| 56 | self._conn.wait(self._sync_gen()) |
| 57 | except e._NO_TRACEBACK as ex: |
| 58 | raise ex.with_traceback(None) |
| 59 | |
| 60 | def __enter__(self) -> Self: |
| 61 | with self._lock: |
| 62 | self._conn.wait(self._enter_gen()) |
| 63 | return self |
| 64 | |
| 65 | def __exit__( |
| 66 | self, |
| 67 | exc_type: type[BaseException] | None, |
| 68 | exc_val: BaseException | None, |
| 69 | exc_tb: TracebackType | None, |
| 70 | ) -> None: |
| 71 | try: |
| 72 | with self._lock: |
| 73 | self._conn.wait(self._exit_gen()) |
| 74 | except Exception as exc2: |
| 75 | # Don't clobber an exception raised in the block with this one |
| 76 | if exc_val: |
| 77 | logger.warning("error ignored terminating %r: %s", self, exc2) |
| 78 | else: |
| 79 | raise exc2.with_traceback(None) |
| 80 | finally: |
| 81 | self._exit(exc_val) |
no outgoing calls
no test coverage detected