(endpoint_name: str)
| 71 | |
| 72 | |
| 73 | def get_endpoints_config(endpoint_name: str): |
| 74 | endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None) |
| 75 | |
| 76 | if not (endpoints_config and os.path.exists(endpoints_config)): |
| 77 | raise FileNotFoundError(f"Endpoints config file not found: {endpoints_config}") |
| 78 | |
| 79 | try: |
| 80 | with open(endpoints_config, "r") as f: |
| 81 | data = json.load(f) |
| 82 | db = data[endpoint_name] |
| 83 | return db |
| 84 | except Exception as e: |
| 85 | raise ValueError( |
| 86 | f"Failed to load endpoints config file: {endpoints_config}" |
| 87 | ) from e |
| 88 | |
| 89 | |
| 90 | def get_bdbs_config(endpoint_name: str): |
no test coverage detected