Server: Auto-load public IP blocklists
- Load known bad IPs from FireHOL blocklists on startup - ~4400 IPs blocked by default - Set PUBLIC_BLOCKLIST=false to disable - Combined with manual BLOCKED_IPS env var
This commit is contained in:
+9
-2
@@ -33,11 +33,18 @@ BUILDS_DIR = Path("builds")
|
|||||||
import os
|
import os
|
||||||
import middleware as mw
|
import middleware as mw
|
||||||
|
|
||||||
# Configure blocked IPs only
|
# Manually blocked IPs
|
||||||
BLOCKED_IPS = set(os.environ.get("BLOCKED_IPS", "").split(",")) - {""}
|
BLOCKED_IPS = set(os.environ.get("BLOCKED_IPS", "").split(",")) - {""}
|
||||||
|
|
||||||
|
# Load public blocklists (set to "false" to disable)
|
||||||
|
USE_PUBLIC_BLOCKLIST = os.environ.get("PUBLIC_BLOCKLIST", "true").lower() == "true"
|
||||||
|
|
||||||
|
if USE_PUBLIC_BLOCKLIST:
|
||||||
|
public_ips = mw.load_public_blocklists()
|
||||||
|
BLOCKED_IPS.update(public_ips)
|
||||||
|
|
||||||
if BLOCKED_IPS:
|
if BLOCKED_IPS:
|
||||||
logger.info(f"IP blacklist enabled: {len(BLOCKED_IPS)} IPs blocked")
|
logger.info(f"Total blocked IPs: {len(BLOCKED_IPS)}")
|
||||||
|
|
||||||
mw.set_ip_config(blocked=BLOCKED_IPS)
|
mw.set_ip_config(blocked=BLOCKED_IPS)
|
||||||
|
|
||||||
|
|||||||
@@ -5,11 +5,48 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import traceback
|
import traceback
|
||||||
|
import httpx
|
||||||
|
import re
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Public blocklist URLs
|
||||||
|
BLOCKLIST_URLS = [
|
||||||
|
"https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/firehol_level1.netset",
|
||||||
|
"https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/iblocklist_isp.netset",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def load_blocklist_from_url(url: str, timeout: int = 10) -> set[str]:
|
||||||
|
"""Download and parse IP blocklist from URL"""
|
||||||
|
ips = set()
|
||||||
|
try:
|
||||||
|
response = httpx.get(url, timeout=timeout, follow_redirects=True)
|
||||||
|
if response.status_code == 200:
|
||||||
|
for line in response.text.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith("#"):
|
||||||
|
continue
|
||||||
|
if re.match(r"^\d+\.\d+\.\d+\.\d+(/\d+)?$", line):
|
||||||
|
ip = line.split("/")[0]
|
||||||
|
ips.add(ip)
|
||||||
|
logger.info(f"Loaded {len(ips)} IPs from blocklist: {url}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to load blocklist from {url}: {e}")
|
||||||
|
return ips
|
||||||
|
|
||||||
|
|
||||||
|
def load_public_blocklists() -> set[str]:
|
||||||
|
"""Load all public blocklists"""
|
||||||
|
all_ips = set()
|
||||||
|
for url in BLOCKLIST_URLS:
|
||||||
|
all_ips.update(load_blocklist_from_url(url))
|
||||||
|
logger.info(f"Total blocked IPs from public lists: {len(all_ips)}")
|
||||||
|
return all_ips
|
||||||
|
|
||||||
|
|
||||||
# Rate limiting config
|
# Rate limiting config
|
||||||
RATE_LIMIT_REQUESTS = 60 # Max requests per window
|
RATE_LIMIT_REQUESTS = 60 # Max requests per window
|
||||||
RATE_LIMIT_WINDOW = 60 # Window in seconds
|
RATE_LIMIT_WINDOW = 60 # Window in seconds
|
||||||
|
|||||||
Reference in New Issue
Block a user