import re
import logging
import os
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
from typing import Optional
import httpx
import json
import structlog
from cachetools import TTLCache
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
# Disable httpx debug logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
from pack_manager import DATA_DIR, scan_pack, get_cached_manifest, PACKS_DIR
from models import PackMeta
from middleware import LoggingMiddleware
from cli import parse_args, run_test_mode, run_production_mode, run_development_mode, run_sync_mode
from log_manager import init_logging
from auth import get_current_user, router as auth_router, init_db, verify_jwt
from roles import Permissions, has_permission
from admin_router import router as admin_router
from friends import router as friends_router, init_friends_db
from playtime import router as playtime_router, init_playtime_db
import asyncio
import hashlib
import aiofiles
import mimetypes
logger = structlog.get_logger(__name__)
# Cache for manifests - expires after 5 minutes
manifest_cache = TTLCache(maxsize=100, ttl=300)
BUILDS_DIR = Path("builds")
VERSIONS_DIR = BUILDS_DIR / "versions"
# Mirror configuration
LAUNCHER_MIRRORS = {
"main": "http://87.120.187.36:1582",
"mirror-1": "http://212.22.82.243:1582",
}
# Server role: "main" or "mirror"
SERVER_ROLE = os.environ.get("SERVER_ROLE", "main")
MAIN_SERVER_URL = os.environ.get("MAIN_SERVER_URL", "") # For mirrors to sync from
SYNC_API_KEY = os.environ.get("SYNC_API_KEY", "changeme") # API key for mirror sync
MASTER_KEY = os.environ.get("MASTER_KEY", "sashegdevsupeddevepta") # Master key for admin/mirror
# IP Filtering Configuration
import os
import middleware as mw
# Only configure manually blocked IPs at import time
# Public blocklists are loaded in lifespan (once, not per-worker)
MANUAL_BLOCKED_IPS = set(os.environ.get("BLOCKED_IPS", "").split(",")) - {""}
# Cache file for blocklist (load once)
BLOCKLIST_CACHE_FILE = Path("data/blocklist_cache.txt")
@asynccontextmanager
async def lifespan(app: FastAPI):
args = parse_args()
# Initialize logging
init_logging()
# Load public blocklists (single worker loads, others wait for cache)
USE_PUBLIC_BLOCKLIST = os.environ.get("PUBLIC_BLOCKLIST", "true").lower() == "true"
all_blocked = set(MANUAL_BLOCKED_IPS)
if USE_PUBLIC_BLOCKLIST:
cached_ips = set()
# Try to load from cache first
if BLOCKLIST_CACHE_FILE.exists():
try:
cached_ips = set(BLOCKLIST_CACHE_FILE.read_text().strip().splitlines())
if cached_ips:
logger.info(f"Loaded {len(cached_ips)} IPs from blocklist cache")
except Exception as e:
logger.warning(f"Failed to load blocklist cache: {e}")
# If no cache, download (only one worker will do this)
if not cached_ips:
DATA_DIR.mkdir(exist_ok=True)
lock_file = DATA_DIR / "blocklist.lock"
try:
# Try to acquire lock (non-blocking)
import fcntl
lock_fd = open(lock_file, 'w')
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# We got the lock - download
cached_ips = mw.load_public_blocklists()
if cached_ips:
BLOCKLIST_CACHE_FILE.write_text("\n".join(cached_ips))
logger.info(f"Downloaded and saved {len(cached_ips)} IPs to blocklist cache")
except BlockingIOError:
# Another process is downloading - wait for cache
pass
finally:
lock_fd.close()
except Exception as e:
logger.warning(f"Lock error: {e}")
# Re-read cache after download
if BLOCKLIST_CACHE_FILE.exists() and not cached_ips:
try:
cached_ips = set(BLOCKLIST_CACHE_FILE.read_text().strip().splitlines())
if cached_ips:
logger.info(f"Loaded {len(cached_ips)} IPs from blocklist cache (after wait)")
except Exception:
pass
all_blocked.update(cached_ips)
mw.set_ip_config(blocked=all_blocked)
logger.info(f"IP blocklist loaded: {len(all_blocked)} IPs")
# Determine environment
if args.test:
env = "test"
elif args.dev:
env = "development"
else:
env = "production"
logger.info(f"Starting ZernMC Launcher Server (environment: {env})")
# Create directories if they don't exist
BUILDS_DIR.mkdir(exist_ok=True)
PACKS_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
init_db()
init_friends_db()
init_playtime_db()
if args.test:
await run_test_mode()
yield
return
logger.info("Scanning packs on startup...")
pack_dirs = [p for p in PACKS_DIR.iterdir() if p.is_dir()]
if not pack_dirs:
logger.warning(f"No packs found in directory: {PACKS_DIR}")
else:
for pack_dir in pack_dirs:
try:
meta = await scan_pack(pack_dir.name)
logger.info(f"Pack scanned successfully: {pack_dir.name} v{meta.version} ({len(meta.files)} files)")
except Exception as e:
logger.error(f"Failed to scan pack: {pack_dir.name} - {e}", exc_info=True)
logger.info("All packs ready. Server is running.")
# Mirror sync with main server
if SERVER_ROLE == "mirror" and MAIN_SERVER_URL:
logger.info(f"Mirror mode: syncing from {MAIN_SERVER_URL}")
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{MAIN_SERVER_URL}/launcher/version")
main_data = resp.json()
main_version = main_data.get("version")
logger.info(f"Main server version: {main_version}")
# Get sync manifest with API key
headers = {"X-Sync-Key": SYNC_API_KEY}
resp = await client.get(f"{MAIN_SERVER_URL}/launcher/sync/{main_version}", headers=headers)
if resp.status_code != 200:
logger.warning(f"Sync failed: {resp.status_code} - {resp.text}")
raise Exception(f"Sync auth failed: {resp.status_code}")
sync_data = resp.json()
logger.info(f"Need to sync {len(sync_data.get('files', []))} files")
# Download each file
for f in sync_data.get("files", []):
file_path = BUILDS_DIR / f["path"]
if not file_path.exists():
logger.info(f"Syncing: {f['path']}")
file_path.parent.mkdir(parents=True, exist_ok=True)
# Download file
file_url = f"{MAIN_SERVER_URL}/launcher/sync/{main_version}/file/{f['path']}"
resp = await client.get(file_url, headers=headers)
file_path.write_bytes(resp.content)
logger.debug(f"Downloaded: {f['path']}")
# Delete removed files
for deleted_file in sync_data.get("delete", []):
del_path = BUILDS_DIR / deleted_file
if del_path.exists():
del_path.unlink()
logger.info(f"Deleted: {deleted_file}")
logger.info("Mirror sync complete")
except Exception as e:
logger.warning(f"Mirror sync failed: {e}")
# Scan launcher versions and generate meta
logger.info("Scanning launcher versions...")
# Generate meta.json in builds/ directory
logger.info("Generating launcher meta...")
generate_launcher_builds_meta()
# Extract new format ZIPs to versions directory
logger.info("Extracting new format versions...")
extract_new_format_versions()
launcher_versions = get_launcher_versions()
if launcher_versions:
latest = launcher_versions[0]
logger.info(f"Launcher meta ready: v{latest['meta']['version']} ({len(latest['meta']['files'])} files)")
else:
logger.warning("No launcher versions found in new format")
logger.info("Launcher meta system ready.")
# Initialize proxy client
global proxy_client
proxy_client = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
# Start background sync task for mirrors
if SERVER_ROLE == "mirror" and MAIN_SERVER_URL:
import asyncio
async def periodic_sync():
sync_interval = 7200 # 2 hours
while True:
await asyncio.sleep(sync_interval)
try:
logger.info("Periodic mirror sync started...")
headers = {"X-Sync-Key": SYNC_API_KEY}
resp = await proxy_client.get(f"{MAIN_SERVER_URL}/launcher/version")
main_data = resp.json()
main_version = main_data.get("version")
resp = await proxy_client.get(
f"{MAIN_SERVER_URL}/launcher/sync/{main_version}",
headers=headers
)
if resp.status_code != 200:
logger.warning(f"Periodic sync failed: {resp.status_code}")
continue
sync_data = resp.json()
logger.info(f"Periodic sync: {len(sync_data.get('files', []))} files")
for f in sync_data.get("files", []):
file_path = BUILDS_DIR / f["path"]
if not file_path.exists():
file_path.parent.mkdir(parents=True, exist_ok=True)
file_url = f"{MAIN_SERVER_URL}/launcher/sync/{main_version}/file/{f['path']}"
resp = await proxy_client.get(file_url, headers=headers)
file_path.write_bytes(resp.content)
logger.debug(f"Synced: {f['path']}")
for deleted_file in sync_data.get("delete", []):
del_path = BUILDS_DIR / deleted_file
if del_path.exists():
del_path.unlink()
logger.info(f"Deleted: {deleted_file}")
logger.info("Periodic mirror sync complete")
except Exception as e:
logger.warning(f"Periodic sync error: {e}")
asyncio.create_task(periodic_sync())
yield
# Cleanup proxy client
if proxy_client:
await proxy_client.aclose()
logger.info("Server shutting down...")
# ====================== ШАБЛОН СТРАНИЦЫ АКТИВАЦИИ ======================
ACTIVATE_PASS_HTML = """
Активация проходки | ZernMC
ZernMC
Активация проходки
"""
# Create app with lifespan
app = FastAPI(title="ZernMC Launcher Server", lifespan=lifespan)
# Add Logging middleware
app.add_middleware(LoggingMiddleware)
# ====================== ОПТИМИЗАЦИЯ ЗАГРУЗКИ ФАЙЛОВ ======================
class CacheControlMiddleware:
"""Middleware for caching static and large files"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "")
# Skip caching for dynamic endpoints
skip_cache = any(p in path for p in ["/api/", "/auth/", "/login", "/launch", "/install"])
if skip_cache:
await self.app(scope, receive, send)
return
# Add caching headers for static files
async def send_wrapper(status, headers, *args, **kwargs):
# Add cache headers for static files
cache_headers = [
(b"cache-control", b"public, max-age=86400"), # 24 hours
(b"etag", b'"file-etag"'),
]
headers = list(headers) + cache_headers
await send(status, headers, *args, **kwargs)
# Use original send
await self.app(scope, receive, send)
app.add_middleware(CacheControlMiddleware)
# Cache for file hashes (ETag)
file_etag_cache = TTLCache(maxsize=1000, ttl=3600)
async def get_etag_for_file(file_path: Path) -> str:
"""Get or calculate ETag for file"""
cache_key = str(file_path)
if cache_key in file_etag_cache:
return file_etag_cache[cache_key]
# Calculate from file size + mtime
stat = file_path.stat()
etag = f'"{stat.st_size}-{stat.st_mtime}"'
file_etag_cache[cache_key] = etag
return etag
async def send_file_async(
file_path: Path,
request: Request,
content_type: str = None,
cache: bool = True
):
"""Optimized async file serving with Range support"""
if not file_path.exists():
raise HTTPException(404, "File not found")
file_size = file_path.stat().st_size
# Determine content type
if content_type is None:
content_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
# Check for Range header (for resumable downloads)
range_header = request.headers.get("range")
if range_header:
# Parse Range header
match = re.match(r"bytes=(\d+)-(\d+)?", range_header)
if match:
start = int(match.group(1))
end = min(file_size - 1, int(match.group(2)) if match.group(2) else file_size - 1)
else:
start, end = 0, file_size - 1
content_length = end - start + 1
# Read chunk asynchronously
async with aiofiles.open(file_path, "rb") as f:
await f.seek(start)
chunk = await f.read(content_length)
# Return 206 Partial Content
return StreamingResponse(
iter([chunk]),
status_code=206,
media_type=content_type,
headers={
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
"Cache-Control": "public, max-age=86400" if cache else "no-cache",
}
)
else:
# Return full file with streaming
async def file_iterator():
async with aiofiles.open(file_path, "rb") as f:
while True:
chunk = await f.read(65536) # 64KB chunks
if not chunk:
break
yield chunk
# Calculate ETag
etag = await get_etag_for_file(file_path)
return StreamingResponse(
file_iterator(),
media_type=content_type,
headers={
"Accept-Ranges": "bytes",
"Content-Length": str(file_size),
"Cache-Control": "public, max-age=86400" if cache else "no-cache",
"ETag": etag,
"X-Content-Type-Options": "nosniff",
}
)
# Register routers
app.include_router(auth_router)
app.include_router(admin_router)
app.include_router(friends_router)
app.include_router(playtime_router)
# Monkey patch to catch invalid HTTP requests
original_data_received = HttpToolsProtocol.data_received
def patched_data_received(self, data):
try:
return original_data_received(self, data)
except Exception as e:
client = self.transport.get_extra_info('peername')
hex_preview = data[:100].hex() if len(data) > 0 else "empty"
logger.error(f"Invalid HTTP request from {client}")
logger.error(f"Error: {str(e)}")
logger.error(f"First 100 bytes (hex): {hex_preview}")
try:
raw_data = data[:500].decode('utf-8', errors='replace')
logger.error(f"Raw request data: {repr(raw_data)}")
except Exception:
pass
try:
response = b"HTTP/1.1 400 Bad Request\r\nContent-Type: text/plain\r\nContent-Length: 21\r\n\r\nInvalid HTTP request"
self.transport.write(response)
self.transport.close()
except Exception:
pass
return
HttpToolsProtocol.data_received = patched_data_received
# ====================== ОСНОВНЫЕ ЭНДПОИНТЫ ======================
@app.get("/")
async def root():
"""Root endpoint"""
logger.info("Root endpoint accessed")
return {
"status": "ok",
"message": "ZernMC Launcher Server is running",
"docs": "/docs",
"redoc": "/redoc"
}
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
# ====================== WEB ИНТЕРФЕЙС ДЛЯ АКТИВАЦИИ ПРОХОДКИ ======================
@app.get("/activate-pass")
async def activate_pass_page():
"""Веб-интерфейс для активации проходки"""
return Response(
content=ACTIVATE_PASS_HTML,
media_type="text/html"
)
# ====================== ЭНДПОИНТЫ ДЛЯ ПАКОВ ======================
@app.get("/packs")
async def list_packs(
request: Request,
current_user: dict = Depends(get_current_user)
):
"""List all available packs - requires auth"""
if not has_permission(current_user["role"], Permissions.VIEW_PACKS):
raise HTTPException(403, "Requires active pass")
packs = []
for pack_dir in PACKS_DIR.iterdir():
if pack_dir.is_dir():
meta_path = DATA_DIR / f"{pack_dir.name}.meta"
if meta_path.exists():
try:
with open(meta_path, 'r', encoding='utf-8') as f:
meta = json.load(f)
updated_at = meta.get("updated_at")
if updated_at and isinstance(updated_at, datetime):
updated_at = updated_at.isoformat()
desc_path = pack_dir / "description.txt"
description = ""
if desc_path.exists():
description = desc_path.read_text(encoding="utf-8")
packs.append({
"name": pack_dir.name,
"version": meta.get("version", 1),
"files_count": len(meta.get("files", {})),
"updated_at": updated_at,
"minecraft_version": meta.get("minecraft_version", "unknown"),
"loader_type": meta.get("loader_type", "vanilla"),
"loader_version": meta.get("loader_version"),
"asset_index": meta.get("asset_index"),
"description": description
})
except Exception as e:
logger.error(f"Failed to load pack meta for {pack_dir.name}: {e}")
packs.append({
"name": pack_dir.name,
"error": str(e)
})
else:
packs.append({
"name": pack_dir.name,
"status": "not_scanned"
})
return {"packs": packs}
@app.post("/pack/{pack_name}/diff")
async def get_pack_diff(
pack_name: str,
request: Request,
current_user: dict = Depends(get_current_user) # Добавляем зависимость
):
"""Client sends: { "mods/jei.jar": "sha256_hash", ... }
Server returns diff information
ТРЕБУЕТ ПРОХОДКУ ДЛЯ СКАЧИВАНИЯ"""
# Проверяем наличие проходки
if not has_permission(current_user["role"], Permissions.DOWNLOAD_PACK):
raise HTTPException(
status_code=403,
detail="Для скачивания сборок требуется активная проходка. Обратитесь к администратору."
)
client_ip = request.client.host if request.client else "unknown"
# Читаем тело запроса
try:
body = await request.json()
except Exception as e:
logger.error(f"Failed to parse JSON body: {e}")
raise HTTPException(400, "Invalid JSON body")
logger.info("Received diff request",
pack=pack_name,
client_files_count=len(body),
client_ip=client_ip)
try:
meta = get_cached_manifest(pack_name)
if not meta:
meta = await scan_pack(pack_name)
except FileNotFoundError:
logger.warning("Pack not found", pack=pack_name, client_ip=client_ip)
raise HTTPException(404, "Pack not found")
except Exception as e:
logger.error("Error loading pack meta", pack=pack_name, error=str(e), exc_info=True)
raise HTTPException(500, f"Internal server error: {str(e)}")
to_download = []
to_delete = []
to_update = []
server_files = meta.files
for path, entry in server_files.items():
client_hash = body.get(path)
if client_hash is None or client_hash != entry.hash:
url = f"/pack/{pack_name}/file/{path}"
to_download.append({
"path": path,
"url": url,
"size": entry.size,
"hash": entry.hash
})
if client_hash is not None:
to_update.append(path)
for path in body:
if path not in server_files:
to_delete.append(path)
logger.info("Diff calculated",
pack=pack_name,
version=meta.version,
to_download=len(to_download),
to_delete=len(to_delete),
to_update=len(to_update),
client_ip=client_ip)
return {
"version": meta.version,
"to_download": to_download,
"to_delete": to_delete,
"to_update": to_update
}
@app.get("/pack/{pack_name}")
async def get_pack_manifest(pack_name: str, request: Request):
"""Get pack manifest with caching"""
client_ip = request.client.host if request.client else "unknown"
cached_meta = get_cached_manifest(pack_name)
if cached_meta:
logger.debug("Manifest served from cache",
pack=pack_name,
version=cached_meta.version,
client_ip=client_ip)
# Исправлено: конвертируем datetime в строку при сериализации
return JSONResponse(
content=cached_meta.model_dump(mode='json'),
headers={"X-Pack-Version": str(cached_meta.version), "X-Cached": "true"}
)
meta_path = Path("data") / f"{pack_name}.meta"
if not meta_path.exists():
logger.warning("Manifest requested but pack not found", pack=pack_name, client_ip=client_ip)
raise HTTPException(404, "Pack not found")
with open(meta_path, 'r', encoding='utf-8') as f:
meta_dict = json.load(f)
# Исправлено: используем model_validate для создания объекта
meta = PackMeta.model_validate(meta_dict)
manifest_cache[pack_name] = meta
logger.debug("Manifest served from disk",
pack=pack_name,
version=meta.version,
client_ip=client_ip)
# Исправлено: конвертируем datetime в строку при сериализации
return JSONResponse(
content=meta.model_dump(mode='json'),
headers={"X-Pack-Version": str(meta.version), "X-Cached": "false"}
)
@app.get("/pack/{pack_name}/file/{file_path:path}")
async def get_pack_file(pack_name: str, file_path: str, request: Request):
"""Get a file from a pack"""
full_path = PACKS_DIR / pack_name / file_path
client_ip = request.client.host if request.client else None
# Security: prevent path traversal
if ".." in file_path:
raise HTTPException(403, "Invalid file path")
if not full_path.exists() or not full_path.is_file():
logger.warning("File not found",
pack=pack_name,
file=file_path,
client_ip=client_ip)
raise HTTPException(404, "File not found")
logger.info("Serving file",
pack=pack_name,
file=file_path,
size=full_path.stat().st_size,
client_ip=client_ip)
return await send_file_async(full_path, request, cache=True)
# ====================== ЭНДПОИНТЫ ДЛЯ ЛАУНЧЕРА ======================
def get_current_launcher_version() -> str:
"""Get current launcher version from meta system (new format) or build.version (legacy)"""
versions = get_launcher_versions()
if versions:
return versions[0]["meta"]["version"]
# Fallback to build.version for legacy
version_file = BUILDS_DIR / "build.version"
if version_file.exists():
return version_file.read_text().strip()
return "1.0.0"
def parse_version(version_str: str) -> dict:
"""Parse version string to determine if it's new or legacy format"""
import re
match = re.match(r'^(\d+)\.(\d+)\.(\d+)(.*)$', version_str)
if not match:
return {"major": 0, "minor": 0, "patch": 0, "suffix": version_str, "is_legacy": True}
major, minor, patch, suffix = match.groups()
suffix = suffix.strip("-")
is_legacy = bool(suffix)
return {
"major": int(major),
"minor": int(minor),
"patch": int(patch),
"suffix": suffix,
"is_legacy": is_legacy
}
def is_new_format(filename: str) -> bool:
"""Check if filename represents new format build"""
return filename.startswith("ZernMC-win-")
# ====================== ЛАУНЧЕР МЕТА СИСТЕМА ======================
def calculate_file_hash(file_path: Path) -> str:
"""Calculate SHA256 hash of a file"""
import hashlib
hash_sha = hashlib.sha256()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hash_sha.update(chunk)
return hash_sha.hexdigest()
def generate_launcher_builds_meta():
"""Generate meta.json in builds/ directory for incremental updates"""
import hashlib
version = get_current_launcher_version()
if not version:
return
meta_path = BUILDS_DIR / "meta.json"
# Check if meta exists and is fresh
if meta_path.exists():
try:
with open(meta_path, 'r', encoding='utf-8') as f:
existing = json.load(f)
if existing.get("version") == version:
logger.debug("Launcher meta.json already exists and is current")
return
except Exception:
pass
# Generate new meta
files = []
try:
for file_path in BUILDS_DIR.rglob("*"):
if file_path.is_file() and file_path.name not in ["meta.json"]:
rel_path = str(file_path.relative_to(BUILDS_DIR))
stat = file_path.stat()
# Calculate hash
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256_hash.update(chunk)
files.append({
"path": rel_path,
"size": stat.st_size,
"hash": f"sha256:{sha256_hash.hexdigest()}"
})
except Exception as e:
logger.warning(f"Failed to generate launcher meta: {e}")
return
mirrors = [{"name": name, "url": url} for name, url in LAUNCHER_MIRRORS.items()]
meta = {
"version": version,
"type": "builds",
"release_date": datetime.utcnow().isoformat(),
"files": files,
"mirrors": mirrors
}
try:
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
logger.info(f"Generated launcher meta.json with {len(files)} files")
except Exception as e:
logger.warning(f"Failed to save meta.json: {e}")
def scan_launcher_version(version: str) -> Optional[dict]:
"""Scan a launcher version directory and return meta"""
# First check if meta exists in builds/ directly (for new format)
meta_path = BUILDS_DIR / "meta.json"
if meta_path.exists():
try:
with open(meta_path, 'r', encoding='utf-8') as f:
meta = json.load(f)
if meta.get("version") == version:
return meta
except Exception:
pass
# Fallback: check versions directory
version_path = VERSIONS_DIR / version
if not version_path.exists() or not version_path.is_dir():
return None
meta_path = version_path / "meta.json"
# Check cache first
if meta_path.exists():
try:
import json
with open(meta_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
pass
# Generate meta
files = []
for file_path in version_path.rglob("*"):
if file_path.is_file() and file_path.name != "meta.json":
rel_path = str(file_path.relative_to(version_path))
stat = file_path.stat()
file_hash = calculate_file_hash(file_path)
files.append({
"path": rel_path,
"size": stat.st_size,
"hash": f"sha256:{file_hash}"
})
meta = {
"version": version,
"type": "new",
"release_date": datetime.utcnow().isoformat(),
"files": files
}
# Save meta
try:
import json
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump(meta, f, indent=2)
except Exception as e:
logger.warning(f"Failed to save launcher meta for {version}: {e}")
return meta
def parse_version_key(v: str) -> tuple:
"""Parse version string for proper numeric sorting"""
try:
parts = v.split(".")
return tuple(int(p) for p in parts)
except:
return (0, 0, 0)
def get_launcher_versions() -> list:
"""Get list of available launcher versions with meta"""
if not VERSIONS_DIR.exists():
return []
versions = []
for version_dir in VERSIONS_DIR.iterdir():
if version_dir.is_dir():
meta = scan_launcher_version(version_dir.name)
if meta:
versions.append({
"version": version_dir.name,
"meta": meta
})
versions.sort(key=lambda x: parse_version_key(x["version"]), reverse=True)
return versions
def get_launcher_version_meta(version: str) -> Optional[dict]:
"""Get meta for specific launcher version"""
return scan_launcher_version(version)
def extract_new_format_versions():
"""Extract new format ZIPs to versions directory"""
VERSIONS_DIR.mkdir(exist_ok=True)
# Find all ZernMC-win-*.zip files
new_format_zips = list(BUILDS_DIR.glob("ZernMC-win-*.zip"))
for zip_file in new_format_zips:
version = zip_file.stem.replace("ZernMC-win-", "")
extract_dir = VERSIONS_DIR / version
# Skip if already extracted and meta exists
if extract_dir.exists() and (extract_dir / "meta.json").exists():
logger.debug(f"Version {version} already extracted")
continue
logger.info(f"Extracting {zip_file.name} to versions/{version}/...")
try:
import zipfile
with zipfile.ZipFile(zip_file, 'r') as zf:
# Extract all files
zf.extractall(extract_dir)
logger.info(f"Extracted {zip_file.name} successfully")
except Exception as e:
logger.error(f"Failed to extract {zip_file.name}: {e}")
# ====================== END ЛАУНЧЕР МЕТА СИСТЕМА ======================
def get_available_zips() -> list:
"""Get list of available zip archives (new format only)"""
if not BUILDS_DIR.exists():
return []
zips = []
for zip_file in BUILDS_DIR.glob("ZernMCLauncher-*.zip"):
if is_new_format(zip_file.name):
continue
version = zip_file.stem.replace("ZernMCLauncher-", "")
parsed = parse_version(version)
stat = zip_file.stat()
zips.append({
"version": version,
"filename": zip_file.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"is_legacy": parsed["is_legacy"]
})
zips.sort(key=lambda x: parse_version_key(x["version"]), reverse=True)
return zips
def get_new_format_zips() -> list:
"""Get list of available zip archives (new format: ZernMC-win-*.zip)"""
if not BUILDS_DIR.exists():
return []
zips = []
for zip_file in BUILDS_DIR.glob("ZernMC-win-*.zip"):
version = zip_file.stem.replace("ZernMC-win-", "")
stat = zip_file.stat()
zips.append({
"version": version,
"filename": zip_file.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
zips.sort(key=lambda x: parse_version_key(x["version"]), reverse=True)
return zips
def get_legacy_zips() -> list:
"""Get list of available legacy zip archives (< 1.0.8 or with suffix)"""
if not BUILDS_DIR.exists():
return []
zips = []
for zip_file in BUILDS_DIR.glob("ZernMCLauncher-*.zip"):
version = zip_file.stem.replace("ZernMCLauncher-", "")
parsed = parse_version(version)
is_legacy = (
parsed["is_legacy"] or
(parsed["major"] < 1) or
(parsed["major"] == 1 and parsed["minor"] == 0 and parsed["patch"] < 8)
)
if is_legacy:
stat = zip_file.stat()
zips.append({
"version": version,
"filename": zip_file.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"is_legacy": True
})
zips.sort(key=lambda x: parse_version_key(x["version"]), reverse=True)
return zips
@app.get("/launcher/version")
async def get_launcher_version():
"""Return launcher version information"""
version = get_current_launcher_version()
new_zips = get_new_format_zips()
legacy_zips = get_legacy_zips()
response = {
"version": version,
"updated_at": datetime.utcnow().isoformat()
}
jar_path = BUILDS_DIR / "ZernMCLauncher.jar"
if jar_path.exists():
response["download_jar"] = "/launcher/download/jar"
response["jar_size"] = jar_path.stat().st_size
exe_path = BUILDS_DIR / "ZernMCLauncher.exe"
if exe_path.exists():
response["download_exe"] = "/launcher/download/exe"
response["exe_size"] = exe_path.stat().st_size
if new_zips:
response["download_zip"] = f"/launcher/download/zip/{new_zips[0]['filename']}"
response["zip_version"] = new_zips[0]["version"]
response["zip_size"] = new_zips[0]["size"]
response["all_zips"] = new_zips
if legacy_zips:
response["legacy_zips"] = legacy_zips
response["legacy_download_url"] = "/launcher/download/legacy"
return response
@app.get("/launcher/download/jar")
async def download_launcher_jar(request: Request = None):
"""Download launcher JAR file"""
file_path = BUILDS_DIR / "zernmclauncher.jar"
if not file_path.exists():
raise HTTPException(404, "JAR file not found")
if request:
return await send_file_async(file_path, request, content_type="application/java-archive", cache=True)
return FileResponse(
path=file_path,
filename="zernmclauncher.jar",
media_type="application/java-archive"
)
@app.get("/launcher/download/exe")
async def download_launcher_exe(request: Request = None):
"""Download launcher EXE file (Windows)"""
file_path = BUILDS_DIR / "ZernMCLauncher.exe"
if not file_path.exists():
raise HTTPException(404, "EXE file not found")
if request:
return await send_file_async(file_path, request, content_type="application/vnd.microsoft.portable-executable", cache=True)
return FileResponse(
path=file_path,
filename="ZernMCLauncher.exe",
media_type="application/vnd.microsoft.portable-executable"
)
@app.get("/launcher/download/zip/{filename}")
async def download_launcher_zip(filename: str, request: Request = None):
"""Download specific launcher ZIP archive"""
valid_patterns = ["ZernMCLauncher-", "ZernMC-win-"]
if ".." in filename or not any(filename.startswith(p) for p in valid_patterns) or not filename.endswith(".zip"):
raise HTTPException(400, "Invalid filename")
file_path = BUILDS_DIR / filename
if not file_path.exists():
raise HTTPException(404, "ZIP file not found")
if request:
return await send_file_async(file_path, request, content_type="application/zip", cache=True)
return FileResponse(
path=file_path,
filename=filename,
media_type="application/zip"
)
@app.get("/launcher/download/latest")
async def download_latest_launcher():
"""Download the latest launcher (new format: ZernMC-win-*.zip)"""
zips = get_new_format_zips()
if zips:
latest_zip = zips[0]["filename"]
return await download_launcher_zip(latest_zip)
raise HTTPException(404, "No new format launcher files available")
@app.get("/launcher/download/legacy")
async def download_legacy_launcher():
"""Download the latest legacy launcher (< 1.0.8 or with suffix)"""
zips = get_legacy_zips()
if zips:
latest_zip = zips[0]["filename"]
return await download_launcher_zip(latest_zip)
raise HTTPException(404, "No legacy launcher files available")
@app.get("/launcher/download/zip/{filename}")
async def download_launcher_zip(filename: str):
"""Download specific launcher ZIP archive"""
if ".." in filename:
raise HTTPException(400, "Invalid filename")
valid_patterns = ["ZernMCLauncher-", "ZernMC-win-"]
if not any(filename.startswith(p) for p in valid_patterns) or not filename.endswith(".zip"):
raise HTTPException(400, "Invalid filename")
file_path = BUILDS_DIR / filename
if not file_path.exists():
raise HTTPException(404, "ZIP file not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/zip"
)
# ====================== ЛАУНЧЕР МЕТА ЭНДПОИНТЫ ======================
@app.get("/launcher/meta")
async def get_launcher_meta_list():
"""Get list of all launcher versions with meta (new format)"""
versions = get_launcher_versions()
return {
"versions": [
{"version": v["version"], "meta": v["meta"]}
for v in versions
]
}
@app.get("/launcher/mirrors")
async def get_launcher_mirrors():
"""Get list of available mirrors"""
mirrors_list = [{"name": name, "url": url} for name, url in LAUNCHER_MIRRORS.items()]
return {"mirrors": mirrors_list}
# ====================== SYNC FOR MIRRORS ======================
def verify_sync_api_key(request: Request):
"""Verify API key for sync endpoints"""
api_key = request.headers.get("X-Sync-Key")
if not api_key or api_key != SYNC_API_KEY:
raise HTTPException(401, "Invalid or missing sync API key")
@app.get("/launcher/sync/{version}")
async def get_sync_manifest(version: str, request: Request):
"""Get sync manifest for mirror servers - returns files to download/delete"""
verify_sync_api_key(request)
if SERVER_ROLE != "main":
raise HTTPException(403, "Sync only available on main server")
# Check for incremental sync (if-modified-since)
last_sync = request.headers.get("If-Modified-Since")
# Get server meta
meta = get_launcher_version_meta(version)
if not meta:
raise HTTPException(404, f"Version {version} not found")
# Build sync response
sync_data = {
"version": version,
"files": [],
"delete": [], # Files that were removed from main
"timestamp": datetime.utcnow().isoformat()
}
for f in meta.get("files", []):
sync_data["files"].append({
"path": f["path"],
"size": f["size"],
"hash": f["hash"],
"url": f"/launcher/file/{version}/{f['path']}"
})
return sync_data
@app.get("/launcher/sync/{version}/file/{file_path:path}")
async def sync_download_file(version: str, file_path: str, request: Request):
"""Download file for mirror sync"""
verify_sync_api_key(request)
if SERVER_ROLE != "main":
raise HTTPException(403, "Sync only available on main server")
full_path = BUILDS_DIR / file_path
if ".." in file_path:
raise HTTPException(403, "Invalid path")
if not full_path.exists():
raise HTTPException(404, "File not found")
return await send_file_async(full_path, request, cache=False)
@app.get("/launcher/meta/{version}")
async def get_launcher_version_meta_handler(version: str):
"""Get meta for specific launcher version"""
meta = get_launcher_version_meta(version)
if not meta:
raise HTTPException(404, f"Version {version} not found or not in new format")
return meta
@app.post("/launcher/diff")
async def get_launcher_diff(request: Request):
"""Get diff for launcher update - compare local files with server version"""
body = await request.json()
# Get latest version
versions = get_launcher_versions()
if not versions:
raise HTTPException(404, "No versions available")
latest = versions[0]
meta = latest["meta"]
# Build hash map from client
client_hashes = body # {filename: hash, ...}
to_download = []
to_delete = []
# Find new/updated files
server_files = {f["path"]: f for f in meta["files"]}
for path, file_info in server_files.items():
if path not in client_hashes:
to_download.append(file_info)
elif client_hashes[path] != file_info["hash"]:
to_download.append(file_info)
# Find deleted files (files on server but not in client)
for path in client_hashes:
if path not in server_files:
to_delete.append(path)
return {
"version": meta["version"],
"to_download": to_download,
"to_delete": to_delete
}
@app.get("/launcher/file/{version}/{file_path:path}")
async def get_launcher_file(version: str, file_path: str, request: Request):
"""Download a specific file from a launcher version"""
# Ищем в builds/ директории (где лежит zernmc.exe, lib, assets и т.д.)
full_path = BUILDS_DIR / file_path
# Security: prevent path traversal
if ".." in file_path:
raise HTTPException(403, "Invalid file path")
if not full_path.exists() or not full_path.is_file():
# Fallback: ищем в versions директории
alt_path = VERSIONS_DIR / version / file_path
if alt_path.exists() and alt_path.is_file():
full_path = alt_path
else:
raise HTTPException(404, "File not found: " + file_path)
return await send_file_async(full_path, request, cache=True)
@app.get("/launcher/download/zip/{version}")
async def download_launcher_zip_version(version: str, request: Request = None):
"""Download full ZIP for specific version (for new installs)"""
zip_path = BUILDS_DIR / f"ZernMC-win-{version}.zip"
if not zip_path.exists():
raise HTTPException(404, f"ZIP for version {version} not found")
if request:
return await send_file_async(zip_path, request, content_type="application/zip", cache=True)
# Fallback without request
return FileResponse(
path=zip_path,
filename=f"ZernMC-win-{version}.zip",
media_type="application/zip"
)
# ====================== END ЛАУНЧЕР МЕТА ЭНДПОИНТЫ ======================
@app.get("/launcher/info")
async def get_launcher_full_info():
"""Full launcher information with all available files"""
version = get_current_launcher_version()
new_zips = get_new_format_zips()
legacy_zips = get_legacy_zips()
info = {
"current_version": version,
"updated_at": datetime.utcnow().isoformat(),
"files": {
"jar": None,
"exe": None,
"zips": new_zips + legacy_zips
},
"recommended": "zip" if new_zips else ("exe" if (BUILDS_DIR / "ZernMCLauncher.exe").exists() else "jar"),
"new_format": {
"available": len(new_zips) > 0,
"latest": new_zips[0] if new_zips else None,
"download_url": "/launcher/download/latest"
},
"legacy": {
"available": len(legacy_zips) > 0,
"count": len(legacy_zips),
"latest": legacy_zips[0] if legacy_zips else None,
"download_url": "/launcher/download/legacy",
"warning": "Legacy builds are technically compatible but not recommended. Consider upgrading to new format."
}
}
jar_path = BUILDS_DIR / "ZernMCLauncher.jar"
if jar_path.exists():
info["files"]["jar"] = {
"size": jar_path.stat().st_size,
"download_url": "/launcher/download/jar"
}
exe_path = BUILDS_DIR / "ZernMCLauncher.exe"
if exe_path.exists():
info["files"]["exe"] = {
"size": exe_path.stat().st_size,
"download_url": "/launcher/download/exe"
}
if new_zips:
info["files"]["latest_zip"] = new_zips[0]
info["files"]["download_latest"] = "/launcher/download/latest"
return info
# ====================== НОВОСТИ ======================
NEWS_DIR = Path(__file__).parent / "news"
@app.get("/news")
async def list_news():
"""List all news files with their content"""
if not NEWS_DIR.exists():
return {"news": []}
news_list = []
for f in sorted(NEWS_DIR.iterdir()):
if f.is_file() and f.suffix == ".txt":
try:
content = f.read_text(encoding="utf-8").strip().split("\n")
if len(content) >= 4:
title = content[0].strip()
news_type = content[1].strip()
version = content[2].strip()
body = "\n".join(content[3:]).strip()
news_list.append({
"id": f.stem,
"title": title,
"type": news_type,
"version": version,
"body": body
})
except Exception as e:
logger.warning(f"Failed to read news file {f.name}: {e}")
news_list.reverse()
return {"news": news_list}
@app.get("/news/{news_id}")
async def get_news(news_id: str):
"""Get a single news item by ID"""
file_path = NEWS_DIR / f"{news_id}.txt"
if not file_path.exists():
raise HTTPException(404, "News not found")
content = file_path.read_text(encoding="utf-8").strip().split("\n")
if len(content) < 4:
raise HTTPException(400, "Invalid news file format")
return {
"id": file_path.stem,
"title": content[0].strip(),
"type": content[1].strip(),
"version": content[2].strip(),
"body": "\n".join(content[3:]).strip()
}
# ====================== ПРОКСИ ЭНДПОИНТЫ ======================
# Эти эндпоинты позволяют клиентам с сетевыми проблемами
# скачивать файлы через сервер Zern
# HTTP клиент для прокси — создаётся в lifespan, закрывается при shutdown
proxy_client: httpx.AsyncClient | None = None
# Кэш для часто запрашиваемых данных (5 минут)
proxy_cache = TTLCache(maxsize=50, ttl=300)
@app.get("/proxy/fabric/versions/loader")
async def proxy_fabric_versions(request: Request):
"""Прокси для Fabric Meta API - список версий загрузчика"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Fabric versions from {client_ip}")
url = "https://meta.fabricmc.net/v2/versions/loader"
# Проверяем кэш
if url in proxy_cache:
logger.debug(f"Proxy cache hit for {url}")
return JSONResponse(content=proxy_cache[url])
try:
response = await proxy_client.get(url)
response.raise_for_status()
data = response.json()
# Кэшируем
proxy_cache[url] = data
logger.info(f"Proxy success: Fabric versions ({len(data)} items)")
return JSONResponse(content=data)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Fabric versions: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
except Exception as e:
logger.error(f"Proxy unexpected error: {e}")
raise HTTPException(500, f"Internal error: {str(e)}")
@app.get("/proxy/fabric/installer/latest")
async def proxy_fabric_installer_latest(request: Request):
"""Получить последнюю версию Fabric Installer"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Fabric installer latest from {client_ip}")
url = "https://maven.fabricmc.net/net/fabricmc/fabric-installer/maven-metadata.xml"
try:
response = await proxy_client.get(url)
response.raise_for_status()
xml = response.text
# Парсим последнюю версию из XML
match = re.search(r'([^<]+)', xml)
if match:
version = match.group(1)
logger.info(f"Proxy success: Latest Fabric installer version = {version}")
return JSONResponse(content={"version": version})
else:
raise HTTPException(500, "Could not parse latest version")
except httpx.HTTPError as e:
logger.error(f"Proxy error for Fabric installer latest: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/fabric/installer/{version}")
async def proxy_fabric_installer_url(version: str, request: Request):
"""Получить URL для скачивания Fabric Installer определенной версии"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Fabric installer URL for v{version} from {client_ip}")
url = f"https://maven.fabricmc.net/net/fabricmc/fabric-installer/{version}/fabric-installer-{version}.jar"
return JSONResponse(content={"url": url, "version": version})
@app.get("/proxy/fabric/maven/{path:path}")
async def proxy_fabric_maven(path: str, request: Request):
"""Прокси для Fabric Maven файлов"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Fabric Maven {path} from {client_ip}")
full_url = f"https://maven.fabricmc.net/{path}"
try:
response = await proxy_client.get(full_url)
response.raise_for_status()
# Определяем content-type
content_type = "application/octet-stream"
if path.endswith(".jar"):
content_type = "application/java-archive"
elif path.endswith(".pom"):
content_type = "application/xml"
return Response(
content=response.content,
media_type=content_type,
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Fabric Maven {path}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/mojang/version_manifest")
async def proxy_mojang_manifest(request: Request):
"""Прокси для Mojang version manifest"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Mojang manifest from {client_ip}")
url = "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json"
if url in proxy_cache:
return JSONResponse(content=proxy_cache[url])
try:
response = await proxy_client.get(url)
response.raise_for_status()
data = response.json()
proxy_cache[url] = data
logger.info("Proxy success: Mojang manifest")
return JSONResponse(content=data)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Mojang manifest: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/mojang/version/{version_id}")
async def proxy_mojang_version(version_id: str, request: Request):
"""Прокси для конкретной версии Mojang"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Mojang version {version_id} from {client_ip}")
# Сначала получаем манифест, чтобы найти URL версии
manifest_url = "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json"
cache_key = f"version_url_{version_id}"
version_url = proxy_cache.get(cache_key)
if not version_url:
try:
response = await proxy_client.get(manifest_url)
response.raise_for_status()
manifest = response.json()
for version in manifest.get("versions", []):
if version.get("id") == version_id:
version_url = version.get("url")
proxy_cache[cache_key] = version_url
break
if not version_url:
raise HTTPException(404, f"Version {version_id} not found")
except httpx.HTTPError as e:
logger.error(f"Proxy error getting version URL: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
try:
response = await proxy_client.get(version_url)
response.raise_for_status()
data = response.json()
logger.info(f"Proxy success: Mojang version {version_id}")
return JSONResponse(content=data)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Mojang version {version_id}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/forge/versions")
async def proxy_forge_versions(request: Request):
"""Прокси для списка версий Forge"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Forge versions from {client_ip}")
url = "https://maven.minecraftforge.net/net/minecraftforge/forge/maven-metadata.xml"
try:
response = await proxy_client.get(url)
response.raise_for_status()
# Возвращаем XML как есть
return Response(
content=response.content,
media_type="application/xml",
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Forge versions: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/forge/maven/{path:path}")
async def proxy_forge_maven(path: str, request: Request):
"""Прокси для Forge Maven файлов"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: Forge Maven {path} from {client_ip}")
full_url = f"https://maven.minecraftforge.net/{path}"
try:
response = await proxy_client.get(full_url)
response.raise_for_status()
content_type = "application/octet-stream"
if path.endswith(".jar"):
content_type = "application/java-archive"
elif path.endswith(".pom"):
content_type = "application/xml"
return Response(
content=response.content,
media_type=content_type,
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy error for Forge Maven {path}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/neoforge/versions")
async def proxy_neoforge_versions(request: Request):
"""Прокси для списка версий NeoForge"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: NeoForge versions from {client_ip}")
url = "https://maven.neoforged.net/releases/net/neoforged/neoforge/maven-metadata.xml"
try:
response = await proxy_client.get(url)
response.raise_for_status()
return Response(
content=response.content,
media_type="application/xml",
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy error for NeoForge versions: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/neoforge/maven/{path:path}")
async def proxy_neoforge_maven(path: str, request: Request):
"""Прокси для NeoForge Maven файлов"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Proxy request: NeoForge Maven {path} from {client_ip}")
full_url = f"https://maven.neoforged.net/{path}"
try:
response = await proxy_client.get(full_url)
response.raise_for_status()
content_type = "application/octet-stream"
if path.endswith(".jar"):
content_type = "application/java-archive"
elif path.endswith(".pom"):
content_type = "application/xml"
return Response(
content=response.content,
media_type=content_type,
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy error for NeoForge Maven {path}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/download")
async def proxy_download(request: Request):
"""Универсальный прокси для скачивания файлов"""
client_ip = request.client.host if request.client else "unknown"
url = request.query_params.get("url")
if not url:
raise HTTPException(400, "Missing 'url' parameter")
# Безопасность: проверяем URL
allowed_domains = [
"maven.fabricmc.net",
"meta.fabricmc.net",
"piston-meta.mojang.com",
"launchermeta.mojang.com",
"resources.download.minecraft.net",
"maven.minecraftforge.net",
"files.minecraftforge.net",
"maven.neoforged.net"
]
# Проверяем, что URL ведет на разрешенный домен
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Убираем порт если есть
domain = domain.split(':')[0]
if domain not in allowed_domains and not any(domain.endswith(f".{d}") for d in allowed_domains):
logger.warning(f"Proxy blocked: {domain} not in allowed list (client: {client_ip}, url: {url[:100]})")
raise HTTPException(403, f"Domain {domain} not allowed")
logger.info(f"Proxy download: {url[:100]}... from {client_ip}")
try:
# Используем streaming response для больших файлов
response = await proxy_client.get(url)
response.raise_for_status()
# Определяем content-type из ответа или по расширению
content_type = response.headers.get("content-type", "application/octet-stream")
return Response(
content=response.content,
media_type=content_type,
headers={
"X-Proxied-By": "ZernMC",
"X-Original-Url": url[:100] # только для отладки
}
)
except httpx.HTTPError as e:
logger.error(f"Proxy download error for {url[:100]}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/asset/{hash}")
async def proxy_asset(hash: str, request: Request):
"""Прокси для Minecraft ассетов по хешу"""
client_ip = request.client.host if request.client else "unknown"
if len(hash) < 2:
raise HTTPException(400, "Invalid hash")
url = f"https://resources.download.minecraft.net/{hash[:2]}/{hash}"
logger.info(f"Proxy asset: {hash} from {client_ip}")
try:
response = await proxy_client.get(url)
response.raise_for_status()
return Response(
content=response.content,
media_type="application/octet-stream",
headers={"X-Proxied-By": "ZernMC"}
)
except httpx.HTTPError as e:
logger.error(f"Proxy asset error for {hash}: {e}")
raise HTTPException(502, f"Bad Gateway: {str(e)}")
@app.get("/proxy/status")
async def proxy_status():
"""Проверка статуса прокси сервера"""
return {
"status": "ok",
"cached_items": len(proxy_cache),
"allowed_domains": [
"maven.fabricmc.net",
"meta.fabricmc.net",
"piston-meta.mojang.com",
"launchermeta.mojang.com",
"resources.download.minecraft.net",
"maven.minecraftforge.net",
"maven.neoforged.net"
],
"note": "Use this proxy if you have network issues connecting to Fabric/Mojang/Forge"
}
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error("Unhandled exception", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error", "type": type(exc).__name__}
)
# ====================== ЗАПУСК ======================
if __name__ == "__main__":
args = parse_args()
if args.test:
import asyncio
asyncio.run(run_test_mode())
elif args.sync:
import asyncio
asyncio.run(run_sync_mode())
elif args.dev:
run_development_mode(args.host, args.port, args.reload)
else:
run_production_mode(args.host, args.port, args.workers)