Server: Add bot protection middleware

- Global rate limiting (60 requests/minute per IP)
- IP whitelist/blacklist via ALLOWED_IPS and BLOCKED_IPS env vars
- Bot detection - silent 404 for suspicious paths (.env, phpinfo, etc.)
- Path traversal detection
- Reduced noise in logs from bot scanners
This commit is contained in:
SashegDev
2026-05-07 17:09:45 +00:00
parent f40cf7afed
commit 04f97c3c80
2 changed files with 147 additions and 19 deletions
+16
View File
@@ -29,6 +29,22 @@ manifest_cache = TTLCache(maxsize=100, ttl=300)
BUILDS_DIR = Path("builds")
# IP Filtering Configuration
import os
import middleware as mw
# Configure allowed IPs (empty = allow all, set IPs = only these allowed)
ALLOWED_IPS = set(os.environ.get("ALLOWED_IPS", "").split(",")) - {""}
# Configure blocked IPs
BLOCKED_IPS = set(os.environ.get("BLOCKED_IPS", "").split(",")) - {""}
if ALLOWED_IPS:
logger.info(f"IP whitelist enabled: {len(ALLOWED_IPS)} IPs allowed")
if BLOCKED_IPS:
logger.info(f"IP blacklist enabled: {len(BLOCKED_IPS)} IPs blocked")
mw.set_ip_config(allowed=ALLOWED_IPS, blocked=BLOCKED_IPS)
@asynccontextmanager
async def lifespan(app: FastAPI):
+130 -18
View File
@@ -5,43 +5,155 @@ import logging
import time
import uuid
import traceback
from collections import defaultdict
from typing import Optional
logger = logging.getLogger(__name__)
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Generate request ID
request_id = str(uuid.uuid4())[:8]
# Rate limiting config
RATE_LIMIT_REQUESTS = 60 # Max requests per window
RATE_LIMIT_WINDOW = 60 # Window in seconds
_ip_request_counts: dict[str, list[float]] = defaultdict(list)
# Get client IP
# IP filtering config (set from main.py)
ALLOWED_IPS: set[str] = set()
BLOCKED_IPS: set[str] = set()
# Suspicious paths that indicate bot scanning
SUSPICIOUS_PATHS = {
".env", ".env.local", ".env.production", ".env.development", ".env.bak",
".env.old", ".env.backup", ".env.orig", ".env.save", ".env~", ".env.swp",
".env.copy", ".env.1", ".ENV",
"appsettings.json", "appsettings.Development.json", "appsettings.Production.json",
"appsettings.Staging.json", "web.config",
"phpinfo.php", "info.php", "test.php", "i.php", "phpi.php", "php.php",
"phptest.php", "server-info.php", "phpinformation.php", "infophp.php",
"php_info.php", "config.php",
"actuator/env", "actuator/configprops", "actuator",
"manage/env", "admin/env", "env",
"actuator/env/aws", "actuator/env/cloud",
"_layouts/15/", "_layouts/15/ToolPane.aspx",
"swagger-ui", "api/docs", "openapi.json",
"wp-admin", "wp-login.php", "wordpress",
"administrator", "phpmyadmin",
".git", ".svn", ".hg",
}
# Known client IPs (allow by default for legitimate users)
KNOWN_CLIENT_IPS = {
"127.0.0.1", "localhost",
# Add known client IPs here or leave empty to allow all
}
def get_client_ip(request: Request) -> str:
"""Extract client IP from request"""
client_ip = request.client.host if request.client else "unknown"
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
client_ip = forwarded.split(",")[0].strip()
return client_ip
# Log incoming request
logger.info(f"{request.method} {request.url.path} (IP: {client_ip}, ID: {request_id})")
# Start timer
def is_ip_allowed(client_ip: str) -> tuple[bool, str]:
"""Check if IP is allowed"""
if BLOCKED_IPS and client_ip in BLOCKED_IPS:
return False, "blocked"
if ALLOWED_IPS and client_ip not in ALLOWED_IPS:
return False, "not_whitelisted"
return True, "allowed"
def check_rate_limit(client_ip: str) -> bool:
"""Check if IP has exceeded rate limit"""
now = time.time()
# Clean old requests
_ip_request_counts[client_ip] = [
t for t in _ip_request_counts[client_ip]
if now - t < RATE_LIMIT_WINDOW
]
if len(_ip_request_counts[client_ip]) >= RATE_LIMIT_REQUESTS:
return False
_ip_request_counts[client_ip].append(now)
return True
def is_suspicious_path(path: str) -> bool:
"""Check if path is suspicious (bot scanning)"""
path_lower = path.lower()
# Direct match
if path_lower in SUSPICIOUS_PATHS:
return True
# Contains suspicious patterns
suspicious_patterns = [
".env", "phpinfo", "actuator", "wp-", "phpmyadmin",
".git", ".svn", "swagger", "openapi",
]
for pattern in suspicious_patterns:
if pattern in path_lower:
return True
# Path traversal attempts
if ".." in path or ".." in path.replace("%2e%2e", "").replace("%252e", ""):
return True
return False
def set_ip_config(allowed: Optional[set[str]] = None, blocked: Optional[set[str]] = None):
"""Configure IP filtering (call from main.py)"""
global ALLOWED_IPS, BLOCKED_IPS
if allowed is not None:
ALLOWED_IPS = allowed
if blocked is not None:
BLOCKED_IPS = blocked
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())[:8]
client_ip = get_client_ip(request)
# Check IP allow/block
allowed, reason = is_ip_allowed(client_ip)
if not allowed:
# Silent block - don't give attackers any info
return Response(status_code=404, content="")
# Check rate limit (skip for known clients)
if client_ip not in KNOWN_CLIENT_IPS:
if not check_rate_limit(client_ip):
logger.warning(f"Rate limited: {client_ip} ({request.url.path})")
return Response(status_code=429, content="Too many requests")
# Check suspicious path (silent 404 for bots)
path = request.url.path
if is_suspicious_path(path):
# Return 404 without logging - confuse the bots
return Response(status_code=404, content="")
# Log legitimate requests
start_time = time.time()
logger.info(f"{request.method} {path} (IP: {client_ip}, ID: {request_id})")
try:
response = await call_next(request)
# Calculate duration
duration = (time.time() - start_time) * 1000
# Log response
logger.info(f"{request.method} {request.url.path}{response.status_code} ({duration:.0f}ms) [ID: {request_id}]")
# Add request ID to response headers
logger.info(f"{request.method} {path}{response.status_code} ({duration:.0f}ms) [ID: {request_id}]")
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
duration = (time.time() - start_time) * 1000
# Log full traceback
error_traceback = traceback.format_exc()
logger.error(f"{request.method} {request.url.path} → ERROR: {str(e)} (ID: {request_id})\n{error_traceback}")
logger.error(f"{request.method} {path} → ERROR: {str(e)} (ID: {request_id})\n{error_traceback}")
raise