server update

This commit is contained in:
Sashegdev
2026-04-04 14:57:15 +00:00
parent cf4a5c74e5
commit 7670edbff7
10 changed files with 1132 additions and 0 deletions
+82
View File
@@ -0,0 +1,82 @@
# cli.py
import argparse
import sys
import asyncio
from pathlib import Path
import structlog
logger = structlog.get_logger(__name__)
def parse_args():
parser = argparse.ArgumentParser(description="ZernMC Launcher Server")
# Mode selection (mutually exclusive)
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument("--dev", action="store_true", help="Development mode with auto-reload")
mode_group.add_argument("--prod", action="store_true", help="Production mode with 4 workers")
mode_group.add_argument("--test", action="store_true", help="Test mode - validate builds and generate manifests")
# Additional options
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=1582, help="Port to bind to (default: 1582)")
parser.add_argument("--workers", type=int, default=4, help="Number of workers for production mode")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
return parser.parse_args()
async def run_test_mode():
"""Validate all packs and generate/update manifests"""
logger.info("Running in TEST mode - validating builds and generating manifests")
from pack_manager import scan_pack, PACKS_DIR
pack_count = 0
error_count = 0
for pack_dir in PACKS_DIR.iterdir():
if pack_dir.is_dir():
try:
logger.info(f"Validating pack: {pack_dir.name}")
meta = await scan_pack(pack_dir.name)
pack_count += 1
logger.info(f"✓ Pack validated: {pack_dir.name} v{meta.version}, {len(meta.files)} files")
except Exception as e:
error_count += 1
logger.error(f"✗ Failed to validate pack {pack_dir.name}", error=str(e), exc_info=True)
logger.info(f"Test mode completed: {pack_count} packs validated, {error_count} errors")
if error_count > 0:
logger.error("Some packs failed validation")
sys.exit(1)
else:
logger.info("All packs validated successfully")
sys.exit(0)
def run_production_mode(host: str, port: int, workers: int):
"""Run with multiple workers"""
logger.info(f"Starting in PRODUCTION mode with {workers} workers on {host}:{port}")
import uvicorn
uvicorn.run(
"main:app",
host=host,
port=port,
workers=workers,
log_config=None,
access_log=False # We have our own logging middleware
)
def run_development_mode(host: str, port: int, reload: bool = True):
"""Run with auto-reload for development"""
logger.info(f"Starting in DEVELOPMENT mode with reload={reload} on {host}:{port}")
import uvicorn
uvicorn.run(
"main:app",
host=host,
port=port,
reload=reload,
log_config=None,
access_log=False
)
+25
View File
@@ -0,0 +1,25 @@
# http_logger.py
import logging
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import uuid
logger = logging.getLogger("uvicorn.error")
class HTTPLogger:
"""Custom HTTP logger to catch invalid requests"""
@staticmethod
def log_invalid_request(data: bytes, client_addr: tuple):
"""Log invalid HTTP requests"""
try:
# Try to decode as much as possible
request_str = data.decode('utf-8', errors='replace')[:500]
logger.warning(
f"Invalid HTTP request received\n"
f"Client: {client_addr[0]}:{client_addr[1]}\n"
f"Data: {request_str}"
)
except Exception as e:
logger.warning(f"Invalid HTTP request from {client_addr}, could not decode: {e}")
+202
View File
@@ -0,0 +1,202 @@
# log_manager.py
import logging
import structlog
from pathlib import Path
import sys
import shutil
from datetime import datetime
import os
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
# Путь к latest.log
LATEST_LOG = LOG_DIR / "latest.log"
def rotate_logs():
"""Rotate logs without compression (like Minecraft)"""
log_files = sorted(LOG_DIR.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True)
for old_log in log_files[10:]:
if old_log.name != "latest.log":
try:
old_log.unlink()
except Exception:
pass
if LATEST_LOG.exists() and LATEST_LOG.stat().st_size > 0:
timestamp = datetime.fromtimestamp(LATEST_LOG.stat().st_mtime).strftime("%Y-%m-%d_%H-%M-%S")
backup_name = LOG_DIR / f"{timestamp}.log"
shutil.move(str(LATEST_LOG), str(backup_name))
def _add_location(logger, method_name, event_dict):
"""Add location to event dict"""
module = event_dict.pop("module", "unknown")
func_name = event_dict.pop("func_name", "")
# Store location
if func_name and func_name != "<module>":
event_dict["_location"] = f"{module}.{func_name}"
else:
event_dict["_location"] = module
return event_dict
class HumanConsoleRenderer:
"""Render logs in human-readable format with colors"""
def __init__(self):
self.colors = {
'debug': '\033[36m',
'info': '\033[32m',
'warning': '\033[33m',
'error': '\033[31m',
'critical': '\033[35m',
'reset': '\033[0m'
}
def __call__(self, logger, name, event_dict):
level = event_dict.get('level', 'info')
timestamp = event_dict.get('timestamp', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
event = event_dict.get('event', '')
location = event_dict.get('_location', 'unknown')
# Format event with location
formatted_event = f"{event} [{location}]"
# Colorize level
color = self.colors.get(level, self.colors['reset'])
colored_level = f"{color}{level:<8}{self.colors['reset']}"
return f"{timestamp} [{colored_level}] {formatted_event}"
class HumanFileRenderer:
"""Render logs in human-readable format without colors for files"""
def __call__(self, logger, name, event_dict):
level = event_dict.get('level', 'info')
timestamp = event_dict.get('timestamp', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
event = event_dict.get('event', '')
location = event_dict.get('_location', 'unknown')
# Format event with location
formatted_event = f"{event} [{location}]"
return f"{timestamp} [{level:<8}] {formatted_event}"
def setup_logging():
"""Setup human-readable logging"""
# Rotate logs on startup
rotate_logs()
# Configure structlog processors
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f", utc=False),
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.MODULE,
structlog.processors.CallsiteParameter.FUNC_NAME,
}
),
_add_location,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configure standard logging
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
# Create formatters
class StructlogConsoleHandler(logging.StreamHandler):
def emit(self, record):
try:
# Convert record to event dict
event_dict = {
'level': record.levelname.lower(),
'timestamp': datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
'event': record.getMessage(),
'_location': getattr(record, 'module', 'unknown')
}
# Add function name if available
if hasattr(record, 'funcName') and record.funcName:
event_dict['_location'] = f"{record.module}.{record.funcName}"
# Render
renderer = HumanConsoleRenderer()
output = renderer(None, None, event_dict)
# Write to console
self.stream.write(output + '\n')
self.flush()
except Exception:
self.handleError(record)
class StructlogFileHandler(logging.FileHandler):
def emit(self, record):
try:
# Convert record to event dict
event_dict = {
'level': record.levelname.lower(),
'timestamp': datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
'event': record.getMessage(),
'_location': getattr(record, 'module', 'unknown')
}
# Add function name if available
if hasattr(record, 'funcName') and record.funcName:
event_dict['_location'] = f"{record.module}.{record.funcName}"
# Render
renderer = HumanFileRenderer()
output = renderer(None, None, event_dict)
# Write to file
self.stream.write(output + '\n')
self.flush()
except Exception:
self.handleError(record)
# Add handlers
console_handler = StructlogConsoleHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
file_handler = StructlogFileHandler(LATEST_LOG, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
# Reduce noise
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("uvicorn.error").setLevel(logging.INFO)
logging.getLogger("fastapi").setLevel(logging.INFO)
return structlog.get_logger()
def get_logger(name):
"""Get a logger instance"""
return logging.getLogger(name)
# Global logger instance
_logger = None
def init_logging():
"""Initialize logging system"""
global _logger
if _logger is None:
_logger = setup_logging()
# Use standard logging for the initial message
logging.info(f"Logging initialized (log_file: {LATEST_LOG})")
return _logger
+28
View File
@@ -0,0 +1,28 @@
# logging_config.py
import sys
from rich.traceback import install as install_rich_traceback
from log_manager import init_logging, get_logger, LATEST_LOG
import logging
install_rich_traceback(show_locals=False)
def setup_logging() -> None:
"""Setup human-readable logging"""
# Initialize the logger
logger_manager = init_logging()
# Determine mode
mode = "development"
if "--test" in sys.argv:
mode = "test"
elif "--prod" in sys.argv:
mode = "production"
elif "--dev" in sys.argv:
mode = "development"
# Log startup using standard logging
logger = get_logger(__name__)
logger.info(f"Server starting in {mode} mode")
return logger
+443
View File
@@ -0,0 +1,443 @@
# main.py
import os
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse
from contextlib import asynccontextmanager
from pathlib import Path
import json
import structlog
from cachetools import TTLCache
import asyncio
import logging
from datetime import datetime
import asyncio
from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol
# Import local modules
from pack_manager import DATA_DIR, get_fabric_versions, get_forge_versions, scan_pack, get_cached_manifest, PACKS_DIR
from models import PackMeta
from logging_config import setup_logging
from middleware import LoggingMiddleware
from cli import parse_args, run_test_mode, run_production_mode, run_development_mode
from log_manager import init_logging, get_logger
from models import MinecraftVersion
from pack_manager import get_minecraft_versions, download_minecraft_version
import minecraft_launcher_lib.command as mcl_command
import minecraft_launcher_lib.utils as mcl_utils
from pathlib import Path
logger = structlog.get_logger(__name__)
# Cache for manifests - expires after 5 minutes
manifest_cache = TTLCache(maxsize=100, ttl=300)
@asynccontextmanager
async def lifespan(app: FastAPI):
args = parse_args()
# Initialize logging
init_logging()
logger = logging.getLogger(__name__)
# Determine environment
if args.test:
env = "test"
elif args.dev:
env = "development"
else:
env = "production"
logger.info(f"Starting ZernMC Launcher Server (environment: {env})")
if args.test:
await run_test_mode()
yield
return
logger.info("Scanning packs on startup...")
pack_dirs = [p for p in PACKS_DIR.iterdir() if p.is_dir()]
if not pack_dirs:
logger.warning(f"No packs found in directory: {PACKS_DIR}")
else:
for pack_dir in pack_dirs:
try:
meta = await scan_pack(pack_dir.name)
logger.info(f"Pack scanned successfully: {pack_dir.name} v{meta.version} ({len(meta.files)} files)")
except Exception as e:
logger.error(f"Failed to scan pack: {pack_dir.name} - {e}", exc_info=True)
logger.info("All packs ready. Server is running.")
yield
logger.info("Server shutting down...")
# Create app with lifespan
app = FastAPI(title="ZernMC Launcher Server", lifespan=lifespan)
# Add logging middleware
app.add_middleware(LoggingMiddleware)
# Monkey patch to catch invalid HTTP requests
original_data_received = HttpToolsProtocol.data_received
def patched_data_received(self, data):
try:
return original_data_received(self, data)
except Exception as e:
client = self.transport.get_extra_info('peername')
logger = logging.getLogger(__name__)
logger.warning(f"Invalid HTTP request from {client}: {str(e)[:200]}")
# Log raw data if possible
try:
raw_data = data[:500].decode('utf-8', errors='replace')
logger.debug(f"Raw request data: {raw_data}")
except:
pass
raise
HttpToolsProtocol.data_received = patched_data_received
@app.get("/")
async def root():
"""Root endpoint"""
logger = logging.getLogger(__name__)
logger.info("Root endpoint accessed")
return {
"status": "ok",
"message": "ZernMC Launcher Server is running",
"docs": "/docs",
"redoc": "/redoc"
}
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/packs")
async def list_packs():
"""List all available packs"""
logger = logging.getLogger(__name__)
packs = []
for pack_dir in PACKS_DIR.iterdir():
if pack_dir.is_dir():
meta_path = DATA_DIR / f"{pack_dir.name}.meta"
if meta_path.exists():
try:
async with open(meta_path, 'r', encoding='utf-8') as f:
meta = json.load(f)
packs.append({
"name": pack_dir.name,
"version": meta.get("version", 1),
"files_count": len(meta.get("files", {})),
"updated_at": meta.get("updated_at")
})
except Exception as e:
logger.error(f"Failed to load pack meta for {pack_dir.name}: {e}")
packs.append({
"name": pack_dir.name,
"error": str(e)
})
else:
packs.append({
"name": pack_dir.name,
"status": "not_scanned"
})
return {"packs": packs}
# ------------------- DIFF ENDPOINT -------------------
@app.post("/pack/{pack_name}/diff")
async def get_pack_diff(pack_name: str, client_files: dict[str, str], request: Request):
"""
Client sends: { "mods/jei.jar": "sha256_hash", ... }
Server returns diff information
"""
client_ip = request.client.host if request.client else "unknown"
logger.info("Received diff request",
pack=pack_name,
client_files_count=len(client_files),
client_ip=client_ip)
try:
# Use cached manifest if available
meta = get_cached_manifest(pack_name)
if not meta:
meta = await scan_pack(pack_name)
except FileNotFoundError:
logger.warning("Pack not found", pack=pack_name, client_ip=client_ip)
raise HTTPException(404, "Pack not found")
except Exception as e:
logger.error("Error loading pack meta", pack=pack_name, error=str(e), exc_info=True)
raise HTTPException(500, "Internal server error")
to_download = []
to_delete = []
to_update = []
server_files = meta.files
# Calculate what needs to be downloaded or updated
for path, entry in server_files.items():
client_hash = client_files.get(path)
if client_hash is None or client_hash != entry.hash:
url = f"/pack/{pack_name}/file/{path}"
to_download.append({
"path": path,
"url": url,
"size": entry.size,
"hash": entry.hash
})
if client_hash is not None:
to_update.append(path)
# Calculate what needs to be deleted
for path in client_files:
if path not in server_files:
to_delete.append(path)
logger.info("Diff calculated",
pack=pack_name,
version=meta.version,
to_download=len(to_download),
to_delete=len(to_delete),
to_update=len(to_update),
client_ip=client_ip)
return {
"version": meta.version,
"to_download": to_download,
"to_delete": to_delete,
"to_update": to_update
}
@app.get("/minecraft/versions")
async def list_minecraft_versions():
"""List available Minecraft versions"""
try:
versions = await get_minecraft_versions()
return {"versions": [v.model_dump() for v in versions]}
except Exception as e:
logger.error(f"Failed to fetch Minecraft versions: {e}")
raise HTTPException(500, "Failed to fetch versions")
@app.get("/minecraft/version/{version}")
async def get_version_details(version: str):
"""Get details for specific Minecraft version"""
# This would fetch version JSON from Mojang
return {"version": version, "status": "available"}
@app.post("/minecraft/download/{version}")
async def download_version(version: str, request: Request):
"""Download Minecraft version"""
client_ip = request.client.host if request.client else "unknown"
logger.info(f"Download request for Minecraft {version}", client_ip=client_ip)
version_path = Path("versions") / version
version_path.mkdir(parents=True, exist_ok=True)
success = await download_minecraft_version(version, version_path)
if success:
return {"status": "success", "version": version}
raise HTTPException(404, "Version not found")
@app.get("/modloaders/{loader_type}")
async def get_modloaders(loader_type: str, minecraft_version: str = None):
"""Get available mod loaders"""
if loader_type == "fabric":
versions = await get_fabric_versions(minecraft_version) if minecraft_version else []
return {"loader": "fabric", "versions": versions}
elif loader_type == "forge":
versions = await get_forge_versions(minecraft_version) if minecraft_version else []
return {"loader": "forge", "versions": versions}
elif loader_type == "vanilla":
return {"loader": "vanilla", "versions": ["vanilla"]}
raise HTTPException(400, "Invalid loader type")
@app.get("/pack/{pack_name}")
async def get_pack_manifest(pack_name: str, request: Request):
"""Get pack manifest with caching"""
client_ip = request.client.host if request.client else "unknown"
# Check cache first
cached_meta = get_cached_manifest(pack_name)
if cached_meta:
logger.debug("Manifest served from cache",
pack=pack_name,
version=cached_meta.version,
client_ip=client_ip)
return JSONResponse(
content=cached_meta.model_dump(),
headers={"X-Pack-Version": str(cached_meta.version), "X-Cached": "true"}
)
# Load from disk if not in cache
meta_path = Path("data") / f"{pack_name}.meta"
if not meta_path.exists():
logger.warning("Manifest requested but pack not found", pack=pack_name, client_ip=client_ip)
raise HTTPException(404, "Pack not found")
async with open(meta_path, 'r', encoding='utf-8') as f:
meta_dict = json.load(f)
meta = PackMeta.model_validate(meta_dict)
# Update cache
manifest_cache[pack_name] = meta
logger.debug("Manifest served from disk",
pack=pack_name,
version=meta.version,
client_ip=client_ip)
return JSONResponse(
content=meta_dict,
headers={"X-Pack-Version": str(meta.version), "X-Cached": "false"}
)
@app.post("/pack/{pack_name}/launch")
async def get_launch_command(
pack_name: str,
request: Request,
options: dict # клиент присылает: {"username": "...", "uuid": "...", "token": "...", "gameDir": "...", "javaPath": "...", "maxMemory": 4096}
):
client_ip = request.client.host if request.client else "unknown"
logger.info("Launch command requested", pack=pack_name, client_ip=client_ip)
try:
meta = get_cached_manifest(pack_name) or await scan_pack(pack_name)
except Exception as e:
raise HTTPException(404, "Pack not found") from e
# Базовые опции для minecraft-launcher-lib
launch_options = {
"username": options.get("username", "Player"),
"uuid": options.get("uuid", "00000000-0000-0000-0000-000000000000"),
"token": options.get("token", "token"),
"gameDirectory": options.get("gameDir", str(Path("."))), # относительный
"executablePath": options.get("javaPath", "java"), # или полный путь
"maxMemory": options.get("maxMemory", 4096),
"customResolution": options.get("customResolution", False),
# можно добавить width/height и т.д.
}
# minecraft_directory — это корень пака у клиента (куда скачаны файлы)
minecraft_dir = launch_options["gameDirectory"]
try:
# Генерируем команду (библиотека сама обработает Fabric/Forge через version.json)
command_list = mcl_command.get_minecraft_command(
version=meta.minecraft_version,
minecraft_directory=minecraft_dir,
options=launch_options
)
# Превращаем абсолютные пути в относительные (очень важно для портативности)
relative_command = []
game_dir_path = Path(minecraft_dir).resolve()
for arg in command_list:
try:
arg_path = Path(arg)
if arg_path.is_absolute() and game_dir_path in arg_path.parents:
rel = arg_path.relative_to(game_dir_path).as_posix()
relative_command.append(f"./{rel}" if os.name != "nt" else rel.replace("/", "\\"))
else:
relative_command.append(arg)
except Exception:
relative_command.append(arg)
# Дополнительно: если в PackMeta есть свои jvmArgs/gameArgs — мерджим
if hasattr(meta, 'launch') and meta.launch:
# Добавляем кастомные jvmArgs в начало и т.д.
pass
return {
"version": meta.version,
"minecraftVersion": meta.minecraft_version,
"loaderType": meta.loader_type,
"command": relative_command, # список строк — готов к subprocess
"workingDirectory": ".", # относительный
"mainClass": meta.launch.mainClass if hasattr(meta, 'launch') else None,
"classpath": meta.launch.classpath if hasattr(meta, 'launch') else []
}
except Exception as e:
logger.error("Failed to generate launch command", pack=pack_name, error=str(e), exc_info=True)
raise HTTPException(500, f"Failed to generate launch command: {str(e)}")
@app.get("/pack/{pack_name}/file/{file_path:path}")
async def get_pack_file(pack_name: str, file_path: str, request: Request):
full_path = PACKS_DIR / pack_name / file_path
client_ip = request.client.host if request.client else None
if not full_path.exists() or not full_path.is_file():
logger.warning("File not found",
pack=pack_name,
file=file_path,
client_ip=client_ip)
raise HTTPException(404, "File not found")
logger.info("Serving file",
pack=pack_name,
file=file_path,
size=full_path.stat().st_size,
client_ip=client_ip)
return FileResponse(full_path)
@app.get("/launcher/version")
async def get_launcher_version():
"""Возвращает информацию о текущей версии лаунчера"""
version_file = Path("builds/build.version")
version = "1.0.0"
if version_file.exists():
version = version_file.read_text().strip()
return {
"version": version,
"download_jar": "/launcher/download?type=jar",
"download_exe": "/launcher/download?type=exe",
"updated_at": datetime.utcnow().isoformat()
}
@app.get("/launcher/download")
async def download_launcher(type: str = "exe"):
"""Отдаёт файл лаунчера"""
if type == "exe":
file_path = Path("builds/ZernMCLauncher.exe")
filename = "ZernMCLauncher.exe"
else:
file_path = Path("builds/ZernMCLauncher.jar")
filename = "ZernMCLauncher.jar"
if not file_path.exists():
raise HTTPException(404, "Launcher file not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream"
)
if __name__ == "__main__":
args = parse_args()
if args.test:
# Test mode runs within lifespan
import asyncio
asyncio.run(run_test_mode())
elif args.dev:
run_development_mode(args.host, args.port, args.reload)
else:
# Default to production
run_production_mode(args.host, args.port, args.workers)
+47
View File
@@ -0,0 +1,47 @@
# middleware.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import logging
import time
import uuid
import traceback
logger = logging.getLogger(__name__)
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Generate request ID
request_id = str(uuid.uuid4())[:8]
# Get client IP
client_ip = request.client.host if request.client else "unknown"
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
client_ip = forwarded.split(",")[0].strip()
# Log incoming request
logger.info(f"{request.method} {request.url.path} (IP: {client_ip}, ID: {request_id})")
# Start timer
start_time = time.time()
try:
response = await call_next(request)
# Calculate duration
duration = (time.time() - start_time) * 1000
# Log response
logger.info(f"{request.method} {request.url.path}{response.status_code} ({duration:.0f}ms) [ID: {request_id}]")
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
duration = (time.time() - start_time) * 1000
# Log full traceback
error_traceback = traceback.format_exc()
logger.error(f"{request.method} {request.url.path} → ERROR: {str(e)} (ID: {request_id})\n{error_traceback}")
raise
+63
View File
@@ -0,0 +1,63 @@
from pydantic import BaseModel, Field
from typing import Dict, List, Optional
from datetime import datetime
import structlog
logger = structlog.get_logger(__name__)
class FileEntry(BaseModel):
path: str # относительный путь (mods/jei.jar)
hash: str # sha256
size: int
added_at: datetime
modified_at: datetime
class LaunchConfig(BaseModel):
mainClass: str
classpath: List[str] = Field(default_factory=list) # относительные пути от gameDir
jvmArgs: List[str] = Field(default_factory=list)
gameArgs: List[str] = Field(default_factory=list)
nativesPath: Optional[str] = None # например: "natives"
assetIndex: str = "1.20" # или актуальная версия
minecraftVersion: str
loaderType: str # "vanilla" | "fabric" | "forge" | "neoforge" | "quilt"
loaderVersion: Optional[str] = None
gameDirectory: str = "." # "." = корень инсталляции пака (рекомендую)
class PackMeta(BaseModel):
pack_name: str
version: int = 1
updated_at: datetime = Field(default_factory=datetime.utcnow)
files: Dict[str, FileEntry] = Field(default_factory=dict)
ignored_dirs: List[str] = Field(
default_factory=lambda: [
"resourcepacks", "shaderpacks", "saves", "logs",
"crash-reports", "screenshots", "journeymap", "config"
]
)
# Основные поля (один раз!)
minecraft_version: str
loader_type: str
loader_version: Optional[str] = None
# Конфигурация запуска (обязательна)
launch: LaunchConfig
class MinecraftVersion(BaseModel):
version: str
type: str # release, snapshot, old_alpha, old_beta
release_time: datetime
url: Optional[str] = None
class ModLoader(BaseModel):
type: str
version: str
minecraft_version: str
installer_url: Optional[str] = None
libraries: List[str] = Field(default_factory=list)
+178
View File
@@ -0,0 +1,178 @@
# pack_manager.py (updated)
import hashlib
import os
from datetime import datetime
from pathlib import Path
import json
import aiofiles
from typing import Optional, Dict
import structlog
from models import PackMeta, FileEntry
import aiohttp
from typing import List, Optional
logger = structlog.get_logger(__name__)
PACKS_DIR = Path("packs")
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)
MINECRAFT_VERSION_MANIFEST_URL = "https://launchermeta.mojang.com/mc/game/version_manifest.json"
FABRIC_META_URL = "https://meta.fabricmc.net/v2/versions"
FORGE_META_URL = "https://files.minecraftforge.net/net/minecraftforge/forge/"
MinecraftVersion=[]
# Cache for loaded manifests
_manifest_cache: Dict[str, PackMeta] = {}
def get_cached_manifest(pack_name: str) -> Optional[PackMeta]:
"""Get manifest from memory cache if available"""
return _manifest_cache.get(pack_name)
def get_meta_path(pack_name: str) -> Path:
return DATA_DIR / f"{pack_name}.meta"
async def calculate_sha256(file_path: Path) -> str:
hash_sha = hashlib.sha256()
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192):
hash_sha.update(chunk)
return hash_sha.hexdigest()
async def get_minecraft_versions() -> List[MinecraftVersion]:
"""Fetch available Minecraft versions from Mojang"""
async with aiohttp.ClientSession() as session:
async with session.get(MINECRAFT_VERSION_MANIFEST_URL) as response:
data = await response.json()
versions = []
for v in data.get("versions", []):
versions.append(MinecraftVersion(
version=v["id"],
type=v["type"],
release_time=datetime.fromisoformat(v["releaseTime"].replace('Z', '+00:00')),
url=v["url"]
))
return versions
async def get_fabric_versions(minecraft_version: str) -> List[str]:
"""Get available Fabric versions for specific Minecraft version"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{FABRIC_META_URL}/loader") as response:
data = await response.json()
fabric_versions = []
for loader in data:
if loader.get("stable", True):
fabric_versions.append(loader["version"])
return fabric_versions
async def get_forge_versions(minecraft_version: str) -> List[str]:
"""Get available Forge versions for specific Minecraft version"""
async with aiohttp.ClientSession() as session:
async with session.get(FORGE_META_URL) as response:
# Forge API is more complex, simplified for now
# You might want to use a proper forge API
return [] # Placeholder
async def download_minecraft_version(version: str, target_path: Path) -> bool:
"""Download Minecraft version JSON and client jar"""
# Get version manifest
async with aiohttp.ClientSession() as session:
async with session.get(f"https://piston-meta.mojang.com/mc/game/{version}/{version}.json") as response:
if response.status == 200:
version_data = await response.json()
# Save version JSON
async with aiofiles.open(target_path / f"{version}.json", 'w') as f:
await f.write(json.dumps(version_data, indent=2))
# Download client jar
downloads = version_data.get("downloads", {})
client_info = downloads.get("client", {})
if client_info:
client_url = client_info.get("url")
async with session.get(client_url) as client_response:
if client_response.status == 200:
async with aiofiles.open(target_path / f"{version}.jar", 'wb') as f:
async for chunk in client_response.content.iter_chunked(8192):
await f.write(chunk)
return True
return False
async def scan_pack(pack_name: str, force_rescan: bool = False) -> PackMeta:
"""Scan pack directory and update manifest if needed"""
pack_path = PACKS_DIR / pack_name
if not pack_path.exists() or not pack_path.is_dir():
raise FileNotFoundError(f"Pack {pack_name} not found")
meta_path = get_meta_path(pack_name)
current_meta: Optional[PackMeta] = None
# Check if we have cached version and force_rescan is False
if not force_rescan and pack_name in _manifest_cache:
return _manifest_cache[pack_name]
if meta_path.exists():
async with aiofiles.open(meta_path, 'r', encoding='utf-8') as f:
data = json.loads(await f.read())
current_meta = PackMeta.model_validate(data)
new_files: Dict[str, FileEntry] = {}
changed = False
for root, dirs, files in os.walk(pack_path):
# Filter ignored directories
ignored = current_meta.ignored_dirs if current_meta else []
dirs[:] = [d for d in dirs if d not in ignored]
for file in files:
file_path = Path(root) / file
rel_path = file_path.relative_to(pack_path).as_posix()
# Skip files in ignored directories
if any(ignored_dir in rel_path.split('/') for ignored_dir in ignored):
continue
stat = file_path.stat()
file_hash = await calculate_sha256(file_path)
entry = FileEntry(
path=rel_path,
hash=file_hash,
size=stat.st_size,
added_at=datetime.utcfromtimestamp(stat.st_ctime),
modified_at=datetime.utcfromtimestamp(stat.st_mtime)
)
new_files[rel_path] = entry
if current_meta and (rel_path not in current_meta.files or
current_meta.files[rel_path].hash != file_hash):
changed = True
# Update manifest if changes detected or new pack
if not current_meta or changed or len(new_files) != len(current_meta.files if current_meta else 0):
version = (current_meta.version + 1) if current_meta else 1
new_meta = PackMeta(
pack_name=pack_name,
version=version,
files=new_files,
updated_at=datetime.utcnow(),
ignored_dirs=current_meta.ignored_dirs if current_meta else []
)
async with aiofiles.open(meta_path, 'w', encoding='utf-8') as f:
await f.write(new_meta.model_dump_json(indent=2))
# Update memory cache
_manifest_cache[pack_name] = new_meta
logger.info("Pack updated", pack=pack_name, new_version=version, files_count=len(new_files))
return new_meta
# Update cache with existing manifest
if current_meta:
_manifest_cache[pack_name] = current_meta
return current_meta
+8
View File
@@ -0,0 +1,8 @@
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
pydantic>=2.9.0
aiofiles>=24.1.0
python-multipart>=0.0.9
watchfiles>=0.24.0
structlog>=24.4.0
rich>=13.9.0
+56
View File
@@ -0,0 +1,56 @@
# view_logs.py - полезный скрипт для просмотра логов
#!/usr/bin/env python3
import sys
from pathlib import Path
from datetime import datetime
LOG_DIR = Path("logs")
def list_logs():
"""List all available log files"""
log_files = sorted(LOG_DIR.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True)
print("Available logs:")
print("-" * 50)
for i, log_file in enumerate(log_files, 1):
size = log_file.stat().st_size / 1024 # KB
modified = datetime.fromtimestamp(log_file.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
current = " (current)" if log_file.name == "latest.log" else ""
print(f"{i}. {log_file.name}{current} - {size:.1f} KB - {modified}")
return log_files
def view_log(log_file):
"""View a log file"""
if not log_file.exists():
print(f"Log file not found: {log_file}")
return
print(f"\n=== {log_file.name} ===\n")
with open(log_file, 'r') as f:
# Show last 50 lines by default
lines = f.readlines()
if len(lines) > 50:
print(f"... (showing last 50 of {len(lines)} lines) ...\n")
lines = lines[-50:]
for line in lines:
print(line.rstrip())
if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == "list":
list_logs()
elif sys.argv[1].isdigit():
logs = list_logs()
idx = int(sys.argv[1]) - 1
if 0 <= idx < len(logs):
view_log(logs[idx])
else:
print("Invalid log number")
else:
# Try to open as filename
view_log(LOG_DIR / sys.argv[1])
else:
# Show latest.log by default
view_log(LOG_DIR / "latest.log")