Adds a new database to the database list. Args: config: DatabaseConfig object that contains the database configuration. skip_initial_health_check: If True, adds the database even if it is unhealthy.
(
self, config: DatabaseConfig, skip_initial_health_check: bool = True
)
| 176 | ) |
| 177 | |
| 178 | async def add_database( |
| 179 | self, config: DatabaseConfig, skip_initial_health_check: bool = True |
| 180 | ): |
| 181 | """ |
| 182 | Adds a new database to the database list. |
| 183 | |
| 184 | Args: |
| 185 | config: DatabaseConfig object that contains the database configuration. |
| 186 | skip_initial_health_check: If True, adds the database even if it is unhealthy. |
| 187 | """ |
| 188 | # The retry object is not used in the lower level clients, so we can safely remove it. |
| 189 | # We rely on command_retry in terms of global retries. |
| 190 | config.client_kwargs.update({"retry": Retry(retries=0, backoff=NoBackoff())}) |
| 191 | |
| 192 | if config.from_url: |
| 193 | client = self._config.client_class.from_url( |
| 194 | config.from_url, **config.client_kwargs |
| 195 | ) |
| 196 | elif config.from_pool: |
| 197 | config.from_pool.set_retry(Retry(retries=0, backoff=NoBackoff())) |
| 198 | client = self._config.client_class.from_pool( |
| 199 | connection_pool=config.from_pool |
| 200 | ) |
| 201 | else: |
| 202 | client = self._config.client_class(**config.client_kwargs) |
| 203 | |
| 204 | circuit = ( |
| 205 | config.default_circuit_breaker() |
| 206 | if config.circuit is None |
| 207 | else config.circuit |
| 208 | ) |
| 209 | |
| 210 | database = Database( |
| 211 | client=client, |
| 212 | circuit=circuit, |
| 213 | weight=config.weight, |
| 214 | health_check_url=config.health_check_url, |
| 215 | ) |
| 216 | |
| 217 | try: |
| 218 | await self._check_db_health(database) |
| 219 | except UnhealthyDatabaseException: |
| 220 | if not skip_initial_health_check: |
| 221 | raise |
| 222 | |
| 223 | highest_weighted_db, highest_weight = self._databases.get_top_n(1)[0] |
| 224 | self._databases.add(database, database.weight) |
| 225 | await self._change_active_database(database, highest_weighted_db) |
| 226 | |
| 227 | async def _change_active_database( |
| 228 | self, new_database: AsyncDatabase, highest_weight_database: AsyncDatabase |
nothing calls this directly
no test coverage detected