From 04f97c3c805757517b3e8500663f499c3e51bf50 Mon Sep 17 00:00:00 2001 From: SashegDev Date: Thu, 7 May 2026 17:09:45 +0000 Subject: [PATCH] Server: Add bot protection middleware - Global rate limiting (60 requests/minute per IP) - IP whitelist/blacklist via ALLOWED_IPS and BLOCKED_IPS env vars - Bot detection - silent 404 for suspicious paths (.env, phpinfo, etc.) - Path traversal detection - Reduced noise in logs from bot scanners --- server/main.py | 16 +++++ server/middleware.py | 150 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 147 insertions(+), 19 deletions(-) diff --git a/server/main.py b/server/main.py index 599d180..e4a11cf 100644 --- a/server/main.py +++ b/server/main.py @@ -29,6 +29,22 @@ manifest_cache = TTLCache(maxsize=100, ttl=300) BUILDS_DIR = Path("builds") +# IP Filtering Configuration +import os +import middleware as mw + +# Configure allowed IPs (empty = allow all, set IPs = only these allowed) +ALLOWED_IPS = set(os.environ.get("ALLOWED_IPS", "").split(",")) - {""} +# Configure blocked IPs +BLOCKED_IPS = set(os.environ.get("BLOCKED_IPS", "").split(",")) - {""} + +if ALLOWED_IPS: + logger.info(f"IP whitelist enabled: {len(ALLOWED_IPS)} IPs allowed") +if BLOCKED_IPS: + logger.info(f"IP blacklist enabled: {len(BLOCKED_IPS)} IPs blocked") + +mw.set_ip_config(allowed=ALLOWED_IPS, blocked=BLOCKED_IPS) + @asynccontextmanager async def lifespan(app: FastAPI): diff --git a/server/middleware.py b/server/middleware.py index f42d886..a0bf852 100644 --- a/server/middleware.py +++ b/server/middleware.py @@ -5,43 +5,155 @@ import logging import time import uuid import traceback +from collections import defaultdict +from typing import Optional logger = logging.getLogger(__name__) +# Rate limiting config +RATE_LIMIT_REQUESTS = 60 # Max requests per window +RATE_LIMIT_WINDOW = 60 # Window in seconds +_ip_request_counts: dict[str, list[float]] = defaultdict(list) + +# IP filtering config (set from main.py) +ALLOWED_IPS: set[str] = set() +BLOCKED_IPS: set[str] = set() + +# Suspicious paths that indicate bot scanning +SUSPICIOUS_PATHS = { + ".env", ".env.local", ".env.production", ".env.development", ".env.bak", + ".env.old", ".env.backup", ".env.orig", ".env.save", ".env~", ".env.swp", + ".env.copy", ".env.1", ".ENV", + "appsettings.json", "appsettings.Development.json", "appsettings.Production.json", + "appsettings.Staging.json", "web.config", + "phpinfo.php", "info.php", "test.php", "i.php", "phpi.php", "php.php", + "phptest.php", "server-info.php", "phpinformation.php", "infophp.php", + "php_info.php", "config.php", + "actuator/env", "actuator/configprops", "actuator", + "manage/env", "admin/env", "env", + "actuator/env/aws", "actuator/env/cloud", + "_layouts/15/", "_layouts/15/ToolPane.aspx", + "swagger-ui", "api/docs", "openapi.json", + "wp-admin", "wp-login.php", "wordpress", + "administrator", "phpmyadmin", + ".git", ".svn", ".hg", +} + +# Known client IPs (allow by default for legitimate users) +KNOWN_CLIENT_IPS = { + "127.0.0.1", "localhost", + # Add known client IPs here or leave empty to allow all +} + + +def get_client_ip(request: Request) -> str: + """Extract client IP from request""" + client_ip = request.client.host if request.client else "unknown" + forwarded = request.headers.get("x-forwarded-for") + if forwarded: + client_ip = forwarded.split(",")[0].strip() + return client_ip + + +def is_ip_allowed(client_ip: str) -> tuple[bool, str]: + """Check if IP is allowed""" + if BLOCKED_IPS and client_ip in BLOCKED_IPS: + return False, "blocked" + + if ALLOWED_IPS and client_ip not in ALLOWED_IPS: + return False, "not_whitelisted" + + return True, "allowed" + + +def check_rate_limit(client_ip: str) -> bool: + """Check if IP has exceeded rate limit""" + now = time.time() + + # Clean old requests + _ip_request_counts[client_ip] = [ + t for t in _ip_request_counts[client_ip] + if now - t < RATE_LIMIT_WINDOW + ] + + if len(_ip_request_counts[client_ip]) >= RATE_LIMIT_REQUESTS: + return False + + _ip_request_counts[client_ip].append(now) + return True + + +def is_suspicious_path(path: str) -> bool: + """Check if path is suspicious (bot scanning)""" + path_lower = path.lower() + + # Direct match + if path_lower in SUSPICIOUS_PATHS: + return True + + # Contains suspicious patterns + suspicious_patterns = [ + ".env", "phpinfo", "actuator", "wp-", "phpmyadmin", + ".git", ".svn", "swagger", "openapi", + ] + + for pattern in suspicious_patterns: + if pattern in path_lower: + return True + + # Path traversal attempts + if ".." in path or ".." in path.replace("%2e%2e", "").replace("%252e", ""): + return True + + return False + + +def set_ip_config(allowed: Optional[set[str]] = None, blocked: Optional[set[str]] = None): + """Configure IP filtering (call from main.py)""" + global ALLOWED_IPS, BLOCKED_IPS + if allowed is not None: + ALLOWED_IPS = allowed + if blocked is not None: + BLOCKED_IPS = blocked + + class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): - # Generate request ID request_id = str(uuid.uuid4())[:8] + client_ip = get_client_ip(request) - # Get client IP - client_ip = request.client.host if request.client else "unknown" - forwarded = request.headers.get("x-forwarded-for") - if forwarded: - client_ip = forwarded.split(",")[0].strip() + # Check IP allow/block + allowed, reason = is_ip_allowed(client_ip) + if not allowed: + # Silent block - don't give attackers any info + return Response(status_code=404, content="") - # Log incoming request - logger.info(f"→ {request.method} {request.url.path} (IP: {client_ip}, ID: {request_id})") + # Check rate limit (skip for known clients) + if client_ip not in KNOWN_CLIENT_IPS: + if not check_rate_limit(client_ip): + logger.warning(f"Rate limited: {client_ip} ({request.url.path})") + return Response(status_code=429, content="Too many requests") - # Start timer + # Check suspicious path (silent 404 for bots) + path = request.url.path + if is_suspicious_path(path): + # Return 404 without logging - confuse the bots + return Response(status_code=404, content="") + + # Log legitimate requests start_time = time.time() + logger.info(f"→ {request.method} {path} (IP: {client_ip}, ID: {request_id})") + try: response = await call_next(request) - - # Calculate duration duration = (time.time() - start_time) * 1000 - - # Log response - logger.info(f"← {request.method} {request.url.path} → {response.status_code} ({duration:.0f}ms) [ID: {request_id}]") - - # Add request ID to response headers + logger.info(f"← {request.method} {path} → {response.status_code} ({duration:.0f}ms) [ID: {request_id}]") response.headers["X-Request-ID"] = request_id - return response except Exception as e: duration = (time.time() - start_time) * 1000 - # Log full traceback error_traceback = traceback.format_exc() - logger.error(f"✗ {request.method} {request.url.path} → ERROR: {str(e)} (ID: {request_id})\n{error_traceback}") + logger.error(f"✗ {request.method} {path} → ERROR: {str(e)} (ID: {request_id})\n{error_traceback}") raise \ No newline at end of file