Create a connection for test. Calling this function will verify that the connectivity with the database works as expected. However the connection will not be stored in the pool. Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
(self, timeout: float = 30.0)
| 72 | ) |
| 73 | |
| 74 | def wait(self, timeout: float = 30.0) -> None: |
| 75 | """ |
| 76 | Create a connection for test. |
| 77 | |
| 78 | Calling this function will verify that the connectivity with the |
| 79 | database works as expected. However the connection will not be stored |
| 80 | in the pool. |
| 81 | |
| 82 | Close the pool, and raise `PoolTimeout`, if not ready within *timeout* |
| 83 | sec. |
| 84 | """ |
| 85 | self._check_open_getconn() |
| 86 | |
| 87 | with self._lock: |
| 88 | assert not self._pool_full_event |
| 89 | self._pool_full_event = Event() |
| 90 | |
| 91 | logger.info("waiting for pool %r initialization", self.name) |
| 92 | self.run_task(AddConnection(self)) |
| 93 | if not self._pool_full_event.wait(timeout): |
| 94 | self.close() # stop all the tasks |
| 95 | raise PoolTimeout(f"pool initialization incomplete after {timeout} sec") |
| 96 | |
| 97 | with self._lock: |
| 98 | assert self._pool_full_event |
| 99 | self._pool_full_event = None |
| 100 | |
| 101 | logger.info("pool %r is ready to use", self.name) |
| 102 | |
| 103 | def _get_ready_connection(self, timeout: float | None) -> CT | None: |
| 104 | if timeout is not None and timeout <= 0.0: |
nothing calls this directly
no test coverage detected