Security middleware to validate requests.
(request: Request, call_next: Callable)
| 48 | |
| 49 | |
| 50 | async def security_middleware(request: Request, call_next: Callable): |
| 51 | """Security middleware to validate requests.""" |
| 52 | # Validate content type for JSON endpoints |
| 53 | if request.url.path.startswith("/api/") and request.method in ["POST", "PUT", "PATCH"]: |
| 54 | content_type = request.headers.get("content-type", "").lower() |
| 55 | if not content_type.startswith("application/json") and request.method != "GET": |
| 56 | # Skip validation for file uploads |
| 57 | if not content_type.startswith("multipart/form-data"): |
| 58 | raise HTTPException( |
| 59 | status_code=400, |
| 60 | detail="Content-Type must be application/json for API endpoints" |
| 61 | ) |
| 62 | |
| 63 | # Validate file paths to prevent path traversal |
| 64 | # Check URL path for suspicious patterns |
| 65 | path = request.url.path |
| 66 | if ".." in path or "./" in path: |
| 67 | # Use a more thorough check |
| 68 | if re.search(r"(\.{2}[/\\])|([/\\]\.{2})", path): |
| 69 | logger = get_server_logger() |
| 70 | logger.log_security_event( |
| 71 | "PATH_TRAVERSAL_ATTEMPT", |
| 72 | f"Suspicious path detected: {path}", |
| 73 | correlation_id=getattr(request.state, 'correlation_id', str(uuid.uuid4())) |
| 74 | ) |
| 75 | raise HTTPException(status_code=400, detail="Invalid path") |
| 76 | |
| 77 | response = await call_next(request) |
| 78 | return response |
| 79 | |
| 80 | |
| 81 | async def rate_limit_middleware(request: Request, call_next: Callable): |
nothing calls this directly
no test coverage detected