(endpoint_name: str)
| 88 | |
| 89 | |
| 90 | def get_bdbs_config(endpoint_name: str): |
| 91 | bdbs_config = os.getenv("REDIS_BDBS_CONFIG_PATH", None) |
| 92 | |
| 93 | if not (bdbs_config and os.path.exists(bdbs_config)): |
| 94 | raise FileNotFoundError(f"BDBs config file not found: {bdbs_config}") |
| 95 | |
| 96 | try: |
| 97 | with open(bdbs_config, "r") as f: |
| 98 | data = json.load(f) |
| 99 | dbs = data["databases"] |
| 100 | for db in dbs: |
| 101 | if db["name"] == endpoint_name: |
| 102 | return db |
| 103 | pytest.fail(f"Failed to find bdb config for {endpoint_name}") |
| 104 | except Exception as e: |
| 105 | raise ValueError(f"Failed to load bdbs config file: {bdbs_config}") from e |
| 106 | |
| 107 | |
| 108 | @pytest.fixture() |
no test coverage detected