(
self,
current_request: Request,
path: str,
)
| 349 | return payload, "" |
| 350 | |
| 351 | async def _apply_auth_rate_limit( |
| 352 | self, |
| 353 | current_request: Request, |
| 354 | path: str, |
| 355 | ) -> JSONResponse | None: |
| 356 | if ( |
| 357 | os.environ.get("ASTRBOT_TEST_MODE") != "true" |
| 358 | and path in _RATE_LIMITED_ENDPOINTS |
| 359 | ): |
| 360 | rl_config = self.config.get("dashboard", {}).get("auth_rate_limit", {}) |
| 361 | rl_enabled = rl_config.get("enable", True) |
| 362 | if rl_enabled: |
| 363 | average_interval = float(rl_config.get("average_interval", 1.0)) |
| 364 | max_burst = int(rl_config.get("max_burst", 3)) |
| 365 | if average_interval <= 0: |
| 366 | average_interval = 1.0 |
| 367 | if max_burst <= 0: |
| 368 | max_burst = 3 |
| 369 | refill_rate = 1.0 / average_interval |
| 370 | client_ip = self._get_request_client_ip(current_request) |
| 371 | limiter = self._rate_limiter_registry.get_or_create( |
| 372 | client_ip, capacity=max_burst, refill_rate=refill_rate |
| 373 | ) |
| 374 | if not await limiter.acquire(): |
| 375 | r = JSONResponse( |
| 376 | error("验证尝试过于频繁,系统可能正在遭受暴力破解") |
| 377 | ) |
| 378 | r.status_code = 429 |
| 379 | return r |
| 380 | return None |
| 381 | |
| 382 | def _get_request_client_ip(self, current_request) -> str: |
| 383 | if bool(self.config.get("dashboard", {}).get("trust_proxy_headers", False)): |
no test coverage detected