#!/usr/bin/env python3 """ ZernProxy Manager v22 - Soft-Tariff with SQLite """ import json import base64 import logging import asyncio import io import os import qrcode import uuid import hashlib import secrets import sqlite3 import time from datetime import datetime, timedelta from qrcode.image.styledpil import StyledPilImage from qrcode.image.styles.moduledrawers import SquareModuleDrawer from typing import List, Dict, Optional, Tuple from contextlib import contextmanager, asynccontextmanager from fastapi import FastAPI, Response, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles import uvicorn import httpx import re from py3xui import Api import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("zernproxy") BASE_DIR = os.path.dirname(os.path.abspath(__file__)) SERVERS_CONF = os.path.join(BASE_DIR, "servers.conf") SETTINGS_CONF = os.path.join(BASE_DIR, "settings.conf") USERS_DB = os.path.join(BASE_DIR, "users.db") LOGO_PATH = os.path.join(BASE_DIR, "logo.png") MOTD_PATH = os.path.join(BASE_DIR, "motd.txt") WEB_DIR = os.path.join(BASE_DIR, "web") HASHES_FILE = os.path.join(BASE_DIR, "hashes.txt") servers = [] settings = {} last_donation_id = 0 # In-memory cache _links_cache: Dict[str, Tuple[List[str], float]] = {} _traffic_cache: Dict[str, Tuple[Dict, float]] = {} CACHE_TTL_TRAFFIC = 120 CACHE_TTL_LINKS = 86400 * 7 def _get_cached(key: str, cache: dict, ttl: float): entry = cache.get(key) if entry and time.time() - entry[1] < ttl: return entry[0] return None def _set_cache(key: str, value, cache: dict): cache[key] = (value, time.time()) def get_cached_links(sub_id: str): return _get_cached(sub_id, _links_cache, CACHE_TTL_LINKS) def set_cached_links(sub_id: str, links: List[str]): _set_cache(sub_id, links, _links_cache) def get_cached_traffic(sub_id: str): return _get_cached(sub_id, _traffic_cache, CACHE_TTL_TRAFFIC) def set_cached_traffic(sub_id: str, data: dict): _set_cache(sub_id, data, _traffic_cache) def clear_cache(sub_id: str = None): if sub_id: _links_cache.pop(sub_id, None) _traffic_cache.pop(sub_id, None) else: _links_cache.clear() _traffic_cache.clear() def get_wdtt_servers() -> List[dict]: """Возвращает серверы с is_wdtt: true""" return [s for s in servers if s.get("is_wdtt") and s.get("is_active")] async def _wdtt_api_call(server: dict, method: str, path: str, body: dict = None) -> dict: """Вызов REST API wdtt-server""" api_key = server.get("wdtt_api_key", "") host = server.get("wdtt_host", "") if not api_key or not host: return {"error": "wdtt server not configured"} port = server.get("wdtt_api_port", "8080") url = f"http://{host}:{port}{path}" headers = {"X-API-Key": api_key, "Content-Type": "application/json"} try: async with httpx.AsyncClient(timeout=10.0, verify=False) as client: if method == "POST": resp = await client.post(url, json=body, headers=headers) elif method == "DELETE": resp = await client.request("DELETE", url, json=body, headers=headers) else: resp = await client.get(url, headers=headers) if resp.status_code in (200, 201): return resp.json() else: try: err = resp.json() return {"error": err.get("error", f"HTTP {resp.status_code}")} except: return {"error": f"HTTP {resp.status_code}"} except Exception as e: logger.error(f"wdtt API call error: {e}") return {"error": str(e)[:100]} async def sync_wdtt_password(user: dict): """Создаёт или обновляет wdtt-пароль для премиум пользователя""" wdtt_servers = get_wdtt_servers() if not wdtt_servers: return tier = user.get("tier", "free") remaining_days = get_remaining_days(user) conn = get_db() try: db_user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() if not db_user: return db_user = dict(db_user) expires_at = db_user.get("tariff_end_at", 0) current_password = db_user.get("wdtt_password") # Если не премиум — удаляем пароль if tier != "paid" or remaining_days <= 0: if current_password: for srv in wdtt_servers: await _wdtt_api_call(srv, "DELETE", "/api/passwords/remove", {"password": current_password}) conn.execute("UPDATE users SET wdtt_password = NULL, wdtt_expires_at = 0, wdtt_server_id = NULL WHERE id = ?", (user["id"],)) conn.commit() logger.info(f"WDTT password removed for user {user['username']}") return # Берём первый wdtt-сервер server = wdtt_servers[0] server_id = servers.index(server) if current_password: # Обновляем expiry result = await _wdtt_api_call(server, "POST", "/api/passwords/update", {"password": current_password, "expires_at": expires_at}) if result.get("status") == "ok": conn.execute("UPDATE users SET wdtt_expires_at = ?, wdtt_server_id = ? WHERE id = ?", (expires_at, server_id, user["id"])) conn.commit() logger.info(f"WDTT password updated for {user['username']}, expires: {expires_at}") else: logger.warning(f"Failed to update WDTT password for {user['username']}: {result.get('error')}") else: # Генерируем новый пароль new_password = secrets.token_urlsafe(24) result = await _wdtt_api_call(server, "POST", "/api/passwords/add", {"password": new_password, "expires_at": expires_at}) if result.get("status") == "ok": conn.execute(""" UPDATE users SET wdtt_password = ?, wdtt_expires_at = ?, wdtt_server_id = ? WHERE id = ? """, (new_password, expires_at, server_id, user["id"])) conn.commit() logger.info(f"WDTT password created for {user['username']}") else: logger.warning(f"Failed to create WDTT password for {user['username']}: {result.get('error')}") finally: conn.close() async def remove_wdtt_password(user: dict): """Удаляет wdtt-пароль пользователя""" conn = get_db() try: db_user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() if not db_user: return db_user = dict(db_user) password = db_user.get("wdtt_password") if not password: return for srv in get_wdtt_servers(): await _wdtt_api_call(srv, "DELETE", "/api/passwords/remove", {"password": password}) conn.execute("UPDATE users SET wdtt_password = NULL, wdtt_expires_at = 0, wdtt_server_id = NULL WHERE id = ?", (user["id"],)) conn.commit() logger.info(f"WDTT password removed for {user['username']}") finally: conn.close() def get_remaining_days(user) -> int: if not isinstance(user, dict): user = dict(user) end = user.get("tariff_end_at", 0) if end: return max(0, (end - int(time.time())) // 86400) return user.get("tariff_days_remaining", 0) def load_json(path: str, default: dict) -> dict: if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.error(f"Error loading {path}: {e}") return default def save_json(path: str, data: dict): try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) except Exception as e: logger.error(f"Error saving {path}: {e}") def init_db(): conn = sqlite3.connect(USERS_DB) conn.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, subscription_id TEXT UNIQUE NOT NULL, tier TEXT DEFAULT 'free', tariff_days_bought INTEGER DEFAULT 0, tariff_days_remaining INTEGER DEFAULT 0, tariff_end_at INTEGER DEFAULT 0, total_paid_rubles INTEGER DEFAULT 0, traffic_limit_gb INTEGER DEFAULT 0, is_active BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) try: conn.execute("ALTER TABLE users ADD COLUMN tariff_end_at INTEGER DEFAULT 0") except: pass try: conn.execute("ALTER TABLE users ADD COLUMN wdtt_password TEXT DEFAULT NULL") except: pass try: conn.execute("ALTER TABLE users ADD COLUMN wdtt_expires_at INTEGER DEFAULT 0") except: pass try: conn.execute("ALTER TABLE users ADD COLUMN wdtt_server_id INTEGER DEFAULT NULL") except: pass now = int(time.time()) conn.execute("UPDATE users SET tariff_end_at = ? + tariff_days_remaining * 86400 WHERE tariff_end_at = 0 AND tariff_days_remaining > 0", (now,)) conn.commit() conn.close() def get_db(): conn = sqlite3.connect(USERS_DB, timeout=30.0) conn.row_factory = sqlite3.Row return conn def load_configs(): global servers, settings servers = load_json(SERVERS_CONF, {"servers": []}).get("servers", []) settings = load_json(SETTINGS_CONF, { "general": {"title": "ZernProxy", "host": "conn.zernmc.ru", "support_url": ""}, "announcement": "", "shortid_rotation_hours": 11, "tiers": { "free": {"name": "Free", "servers": [], "traffic_limit_gb": 0}, "paid": {"name": "Premium", "servers": [], "traffic_limit_gb": 0} }, "payments": {"donationalerts": {"enabled": False, "api_token": "", "webhook_secret": "", "check_interval_minutes": 5}}, "auto_propagate": {"enabled": False, "interval_minutes": 60} }) logger.info(f"Loaded {len(servers)} servers") init_db() load_configs() def render_template(name: str, **kwargs) -> str: path = os.path.join(WEB_DIR, name) try: with open(path, "r", encoding="utf-8") as f: html = f.read() except FileNotFoundError: logger.error(f"Template not found: {path}") return "Template error" for k, v in kwargs.items(): html = html.replace(f"{{%{k}%}}", str(v)) return html def get_flag_emoji(country_code: str) -> str: if not country_code or len(country_code) < 2: return "" try: return chr(ord(country_code[0].upper()) + 127397) + chr(ord(country_code[1].upper()) + 127397) except: return "" def generate_qr_base64(data: str, size: int = 300) -> str: qr = qrcode.QRCode(version=None, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=2) qr.add_data(data) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white", image_factory=StyledPilImage, module_drawer=SquareModuleDrawer()) img = img.resize((size, size), resample=0) buffered = io.BytesIO() img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() def get_logo_base64() -> Optional[str]: if os.path.exists(LOGO_PATH): try: with open(LOGO_PATH, "rb") as f: return base64.b64encode(f.read()).decode() except: pass return None def get_motd() -> str: if not os.path.exists(MOTD_PATH): return "" with open(MOTD_PATH, encoding="utf-8") as f: return f.read().strip() def get_random_hashes(count: int = 4) -> List[str]: """Читает до `count` случайных хешей из hashes.txt""" if not os.path.exists(HASHES_FILE): return [] with open(HASHES_FILE, encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip() and not line.startswith("#")] if not lines: return [] import random return random.sample(lines, min(count, len(lines))) def format_bytes(bytes_val: int) -> str: if bytes_val == 0: return "0 B" sizes = ["B", "KB", "MB", "GB", "TB"] i = 0 val = float(bytes_val) while val >= 1024 and i < len(sizes) - 1: val /= 1024 i += 1 return f"{val:.1f} {sizes[i]}" if val < 100 else f"{int(val)} {sizes[i]}" def _parse_throughput(s: str) -> float: if not s: return 0.0 total_bytes = 0.0 for m in re.finditer(r'([\d.]+)\s*([KMGT])?B/s', s): val = float(m.group(1)) unit = m.group(2) if unit == 'K': val *= 1024 elif unit == 'M': val *= 1024**2 elif unit == 'G': val *= 1024**3 elif unit == 'T': val *= 1024**4 total_bytes += val return total_bytes / 1024 def calc_health_score(checks: dict) -> tuple: cpu = checks.get("CPU", {}).get("value") ram = checks.get("RAM", {}).get("value") disk = checks.get("Disk /", {}).get("value") net_raw = checks.get("Net ↓↑", {}).get("value", "") io_raw = checks.get("Disk I/O", {}).get("value", "") cpu_s = cpu if cpu is not None else 50 ram_s = ram if ram is not None else 50 disk_s = disk if disk is not None else 50 net_kbs = _parse_throughput(net_raw) io_kbs = _parse_throughput(io_raw) if net_kbs < 500: net_s = 0 elif net_kbs < 10000: net_s = (net_kbs - 500) / 9500 * 50 elif net_kbs < 100000: net_s = 50 + (net_kbs - 10000) / 90000 * 30 else: net_s = min(100, 80 + (net_kbs - 100000) / 100000 * 20) if io_kbs < 100: io_s = 0 elif io_kbs < 10000: io_s = (io_kbs - 100) / 9900 * 50 elif io_kbs < 100000: io_s = 50 + (io_kbs - 10000) / 90000 * 30 else: io_s = min(100, 80 + (io_kbs - 100000) / 100000 * 20) score = cpu_s * 0.30 + ram_s * 0.25 + disk_s * 0.20 + net_s * 0.15 + io_s * 0.10 if score < 40: return (round(score), "#10b981", "Отлично") elif score < 70: return (round(score), "#f59e0b", "Нагружен") else: return (round(score), "#f43f5e", "Критично") def get_traffic_stats(sub_id: str) -> dict: total_up = 0 total_down = 0 by_server = {} for srv in servers: srv_up = 0 srv_down = 0 for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") if not all([api_host, api_user, api_pass, inbound_id]): continue try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() inbounds = api.inbound.get_list() for ib in inbounds: if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: if getattr(client, 'sub_id', '') == sub_id: srv_up += client.up or 0 srv_down += client.down or 0 except: pass if srv_up or srv_down: by_server[srv["name"]] = {"up": srv_up, "down": srv_down} total_up += srv_up total_down += srv_down return {"total_up": total_up, "total_down": total_down, "by_server": by_server} def generate_sub_id(length: int = 16) -> str: return ''.join(secrets.choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(length)) async def fetch_vless_links(url: str) -> List[str]: async with httpx.AsyncClient(verify=False, timeout=10.0, follow_redirects=True) as client: try: resp = await client.get(url) if resp.status_code == 404 or resp.status_code == 400: logger.warning(f"Sub not found on server: {url}") return [] if resp.status_code == 200: content = resp.text.strip() try: decoded = base64.b64decode(content).decode('utf-8') links = re.findall(r'([a-z]+://[^\s\n]+)', decoded) if links: return links except: pass return re.findall(r'([a-z]+://[^\s\n]+)', content) except Exception as e: logger.error(f"Fetch error: {e}") return [] def find_user_on_servers(sub_id: str) -> Optional[str]: """Ищет юзера на 3x-UI серверах по subId, возвращает username""" logger.info(f"Looking for sub_id: {sub_id}") for srv in servers: for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") if not all([api_host, api_user, api_pass, inbound_id]): continue try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() inbounds = api.inbound.get_list() for ib in inbounds: if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: client_subid = getattr(client, 'sub_id', '') or '' logger.info(f"Found client: {client.email} sub_id: {client_subid}") if client_subid == sub_id: email = client.email or "" if '@' in email: username = email.split('@')[0] if '_' in username: username = username.split('_')[0] logger.info(f"Migrated user: {username} (found on {srv['name']}/{inbound['name']})") return username except Exception as e: logger.error(f"Error checking {srv['name']}/{inbound['name']}: {e}") return None def get_user_tier(sub_id: str) -> Tuple[Optional[dict], str]: conn = get_db() try: user = conn.execute("SELECT * FROM users WHERE subscription_id = ?", (sub_id,)).fetchone() if not user: username = find_user_on_servers(sub_id) if username: try: conn.execute(""" INSERT INTO users (username, subscription_id, tier, tariff_days_bought, tariff_days_remaining, total_paid_rubles, traffic_limit_gb, is_active) VALUES (?, ?, 'free', 0, 0, 0, 0, 1) """, (username, sub_id)) conn.commit() user = conn.execute("SELECT * FROM users WHERE subscription_id = ?", (sub_id,)).fetchone() logger.info(f"User migrated: {username}") except sqlite3.IntegrityError: pass if not user: return None, "free" user = dict(user) if user['tier'] == 'paid' and get_remaining_days(user) <= 0: conn.execute("UPDATE users SET tier = 'free' WHERE id = ?", (user['id'],)) conn.commit() user['tier'] = 'free' return user, user['tier'] finally: conn.close() def get_servers_for_tier(tier: str) -> List[dict]: result = [] for srv in servers: if not srv.get("is_active"): continue result.append(srv) return result def deduplicate_inbounds(servers_list: List[dict], tier: str) -> List[Tuple[dict, dict]]: seen = set() result = [] for srv in servers_list: for inbound in srv.get("inbounds", []): if tier == "free" and not inbound.get("is_free", True): continue key = (srv["name"], inbound.get("name", "")) if key not in seen: seen.add(key) result.append((srv, inbound)) return result @asynccontextmanager async def lifespan(app): ap_cfg = settings.get("auto_propagate", {}) if ap_cfg.get("enabled", False): interval = max(10, ap_cfg.get("interval_minutes", 60)) * 60 logger.info(f"Auto-propagate enabled, interval={interval}s") loop = asyncio.get_event_loop() async def _run(): while True: for srv in servers: if not srv.get("is_active", True): continue try: result = await loop.run_in_executor(None, propagate_server_sync, srv["name"]) if result.get("added", 0) or result.get("failed", 0): logger.info(f"Auto-propagate {srv['name']}: +{result.get('added',0)} added, {result.get('failed',0)} failed, {result.get('skipped',0)} skipped") except: logger.exception(f"Auto-propagate error on {srv.get('name','?')}") await asyncio.sleep(interval) task = asyncio.create_task(_run()) yield if ap_cfg.get("enabled", False): task.cancel() app = FastAPI(title="ZernProxy Manager", docs_url=None, redoc_url=None, lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) app.mount("/web", StaticFiles(directory=WEB_DIR), name="web") ADMIN_USER = settings.get("admin", {}).get("username", "admin") ADMIN_PASS = settings.get("admin", {}).get("password", "") def check_admin(request: Request) -> bool: if not ADMIN_PASS: return True token = request.cookies.get("admin_token", "") if token == ADMIN_PASS: return True auth = request.headers.get("Authorization", "") if auth.startswith("Basic "): try: decoded = base64.b64decode(auth[6:]).decode() u, p = decoded.split(":", 1) if u == ADMIN_USER and p == ADMIN_PASS: return True except: pass return False @app.get("/admin/login") async def admin_login_page(): return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg="")) @app.post("/admin/login") async def admin_login(request: Request): data = await request.form() if data.get("username") == ADMIN_USER and data.get("password") == ADMIN_PASS: resp = RedirectResponse(url="/admin/users", status_code=303) resp.set_cookie(key="admin_token", value=ADMIN_PASS, max_age=86400 * 7, httponly=True) return resp return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg='
Неверный пароль
'), status_code=401) @app.get("/sub/{subscription_id}") async def get_subscription(request: Request, subscription_id: str, format: str = Query("base64", pattern="^(json|base64|raw)$")): accept = request.headers.get("accept", "") if "text/html" in accept and format == "base64": return await get_web_page(subscription_id) user, tier = get_user_tier(subscription_id) if not user: raise HTTPException(404, "User not found") if not user.get("is_active"): raise HTTPException(403, "User is disabled") tier_config = settings.get("tiers", {}).get(tier, {}) servers_for_tier = get_servers_for_tier(tier) inbounds = deduplicate_inbounds(servers_for_tier, tier) if not inbounds: raise HTTPException(404, "No servers available") all_links = get_cached_links(subscription_id) if all_links is None: all_links = [] seen_links = set() servers_processed = set() server_links = {} for srv, inbound in inbounds: srv_name = srv["name"] if srv_name not in servers_processed: servers_processed.add(srv_name) sub_path = srv["sub_path"].format(sub_id=subscription_id) url = f"{srv['subscription_url'].rstrip('/')}{sub_path}" server_links[srv_name] = await fetch_vless_links(url) links = server_links.get(srv_name, []) srv_inbounds = [ib for s, ib in inbounds if s["name"] == srv_name] try: link_idx = srv_inbounds.index(inbound) except ValueError: continue if link_idx >= len(links): continue link = links[link_idx] clean_link = link.split('#')[0] if clean_link in seen_links: continue seen_links.add(clean_link) remark = f"{get_flag_emoji(srv.get('country', ''))} {srv_name.upper()} ({inbound['name']})" all_links.append(f"{clean_link}#{remark}") set_cached_links(subscription_id, all_links) if not all_links: raise HTTPException(404, "No links found") if format == "json": return {"links": all_links, "count": len(all_links), "tier": tier} lines = [] motd_text = get_motd() announce_header = "" if motd_text: announce_header = f"base64:{base64.b64encode(motd_text.encode()).decode()}" elif settings.get("announcement"): announce_header = f"base64:{base64.b64encode(settings['announcement'].encode()).decode()}" host = settings.get("general", {}).get("host", "conn.zernmc.ru") title = settings.get("general", {}).get("title", "ZernProxy") support_url = settings.get("general", {}).get("support_url", "") update_interval = 12 web_url = f"https://{host}/sub/{subscription_id}" lines.append(f"#profile-web-page-url: {web_url}") lines.append(f"#profile-title: {title}") lines.append(f"#profile-update-interval: {update_interval}") lines.append("#hide-settings: 1") if support_url: lines.append(f"#support-url: {support_url}") expire_ts = user.get("tariff_end_at", 0) or 0 rem_days = get_remaining_days(user) if rem_days > 0: if not expire_ts: expire_ts = int((datetime.now() + timedelta(days=rem_days)).timestamp()) lines.append("#sub-expire: 1") if support_url: lines.append(f"#sub-expire-button-link: {support_url}") traffic_limit = user.get("traffic_limit_gb") or tier_config.get("traffic_limit_gb", 0) traffic_limit_bytes = traffic_limit * 1073741824 if traffic_limit > 0 else 0 traffic = get_cached_traffic(subscription_id) if not traffic: traffic = get_traffic_stats(subscription_id) set_cached_traffic(subscription_id, traffic) upload = traffic.get("total_up", 0) download = traffic.get("total_down", 0) lines.append(f"#subscription-userinfo: upload={upload}; download={download}; total={traffic_limit_bytes}; expire={expire_ts}") if announce_header: lines.append(f"#announce: {announce_header}") lines.extend(all_links) content = "\n".join(lines) headers = { "Profile-Title": title, "Profile-Update-Interval": str(update_interval), "Profile-Web-Page-Url": web_url, "Subscription-Userinfo": f"upload={upload}; download={download}; total={traffic_limit_bytes}; expire={expire_ts}", "Content-Disposition": f"attachment; filename={title}_{user['username']}", } if announce_header: headers["Announce"] = announce_header if support_url: headers["Support-Url"] = support_url headers["Hide-Settings"] = "1" if expire_ts: headers["Sub-Expire"] = "1" if support_url: headers["Sub-Expire-Button-Link"] = support_url # WDTT-специфичные заголовки для клиента ZernProxy is_wdtt_client = request.query_params.get("client") == "wdtt" or "WDTT" in request.headers.get("User-Agent", "") if is_wdtt_client or user.get("wdtt_password"): wdtt_servers = get_wdtt_servers() if wdtt_servers: server_list = [] for s in wdtt_servers: server_list.append({ "name": s.get("name", ""), "host": s.get("wdtt_host", ""), "dtls_port": s.get("wdtt_dtls_port", 56000), "wg_port": s.get("wdtt_wg_port", 56001), "country": s.get("country", ""), }) headers["X-WDTT-Servers"] = json.dumps(server_list) if user.get("wdtt_password"): headers["X-WDTT-Credentials"] = json.dumps({ "password": user["wdtt_password"], "expires_at": user.get("wdtt_expires_at", 0), "server_id": user.get("wdtt_server_id", 0), }) # ZernHash: случайные хеши для клиента if is_wdtt_client or "ZernHash" in request.headers.get("User-Agent", ""): hashes = get_random_hashes(4) if hashes: headers["X-WDTT-Hashes"] = json.dumps(hashes) if format == "base64": return Response(content=base64.b64encode(content.encode()).decode(), media_type="text/plain; charset=utf-8", headers=headers) return Response(content=content, media_type="text/plain; charset=utf-8", headers=headers) async def get_web_page(subscription_id: str): user, tier = get_user_tier(subscription_id) if not user: raise HTTPException(404, "User not found") tier_config = settings.get("tiers", {}).get(tier, {}) servers_for_tier = get_servers_for_tier(tier) inbounds = deduplicate_inbounds(servers_for_tier, tier) host = settings.get("general", {}).get("host", "conn.zernmc.ru") title = settings.get("general", {}).get("title", "ZernProxy") announcement = get_motd() or settings.get("announcement", "") da_config = settings.get("payments", {}).get("donationalerts", {}) sub_url = f"https://{host}/sub/{subscription_id}" qr_base64 = generate_qr_base64(sub_url, size=300) logo_base64 = get_logo_base64() logo_html = f'Logo' if logo_base64 else '
' announcement_html = f'
{announcement}
' if announcement else '' tier_color = "#4CAF50" if tier == "paid" else "#757575" tier_name = tier_config.get("name", "Free") tier_badge = f'{tier_name}' days_remaining = get_remaining_days(user) days_info = f"

⏳ Осталось дней: {days_remaining}

" if tier == "paid" and days_remaining > 0 else "" traffic = get_cached_traffic(subscription_id) if not traffic: traffic = get_traffic_stats(subscription_id) set_cached_traffic(subscription_id, traffic) traffic_limit_val = user.get('traffic_limit_gb') or tier_config.get('traffic_limit_gb', 0) traffic_limit_str = "∞" if traffic_limit_val == 0 else f"{traffic_limit_val} GB" traffic_info = f"

📊 Лимит: {traffic_limit_str}

⬆️ {format_bytes(traffic['total_up'])} | ⬇️ {format_bytes(traffic['total_down'])}

" traffic_details = "" if traffic.get("by_server"): for srv_name, data in traffic["by_server"].items(): if data["up"] or data["down"]: traffic_details += f'
{srv_name}: ⬆ {format_bytes(data["up"])} ⬇ {format_bytes(data["down"])}
' info_html = f'
{days_info}{traffic_info}{traffic_details}
' if days_info or traffic_info or traffic_details else '' servers_html = "".join(f'{get_flag_emoji(srv.get("country", ""))} {srv["name"].upper()}' for srv in servers_for_tier) support_btn = "" if da_config.get("enabled"): da_url = da_config.get("url", "#") support_btn = f''' Поддержать проект ''' html = render_template("sub.html", title=title, logo_html=logo_html, tier_badge=tier_badge, announcement_html=announcement_html, info_html=info_html, servers_html=servers_html, qr_base64=qr_base64, sub_url=sub_url, support_btn=support_btn) return HTMLResponse(content=html) @app.post("/payment/webhook/donationalerts") async def webhook_donationalerts(request: Request): da_config = settings.get("payments", {}).get("donationalerts", {}) webhook_secret = da_config.get("webhook_secret", "") if webhook_secret: provided = request.headers.get("X-Webhook-Secret", "") if provided != webhook_secret: return JSONResponse({"error": "Invalid secret"}, status_code=403) try: data = await request.json() except: return JSONResponse({"error": "Invalid JSON"}, status_code=400) amount = data.get("amount", 0) username = data.get("username", "") message = data.get("message", "") donation_id = data.get("id", 0) logger.info(f"DA webhook: id={donation_id} amount={amount} username={username}") user = None message_parts = message.split() if message else [] for part in message_parts: conn = get_db() try: if part.isdigit(): user = conn.execute("SELECT * FROM users WHERE id = ?", (int(part),)).fetchone() if not user: user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (part,)).fetchone() if user: break finally: conn.close() if not user and username: conn = get_db() try: user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (username,)).fetchone() finally: conn.close() if not user: return JSONResponse({"status": "ignored", "reason": "user_not_found"}) tiers_config = settings.get("tiers", {}) tier = None days = 0 for tier_name, tier_data in tiers_config.items(): prices = tier_data.get("prices", {}) tier_days = tier_data.get("days", {}) for price_key, price_val in prices.items(): if amount == price_val: tier = tier_name days = tier_days.get(price_key, 30) break if tier: break if not tier: return JSONResponse({"status": "ignored", "reason": "amount_not_recognized"}) conn = get_db() try: now_ts = int(time.time()) conn.execute(""" UPDATE users SET tier = ?, tariff_days_bought = tariff_days_bought + ?, tariff_end_at = MAX(COALESCE(tariff_end_at, 0), ?) + ? * 86400, total_paid_rubles = total_paid_rubles + ? WHERE id = ? """, (tier, days, now_ts, days, amount, user["id"])) conn.commit() updated = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() if updated: asyncio.create_task(sync_wdtt_password(dict(updated))) finally: conn.close() logger.info(f"VPN payment: {username} paid {amount} RUB, +{days} days") return JSONResponse({"status": "ok", "user": user["username"], "days": days}) @app.get("/admin/users") async def admin_users(request: Request): if not check_admin(request): return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg="")) conn = get_db() try: users = conn.execute("SELECT * FROM users ORDER BY created_at DESC").fetchall() finally: conn.close() da_config = settings.get("payments", {}).get("donationalerts", {}) title = settings.get("general", {}).get("title", "ZernProxy") tier_options = "".join( f'' for k, v in settings.get("tiers", {}).items()) users_rows = "" for u in users: traffic = f'{u["traffic_limit_gb"]} GB' if u['traffic_limit_gb'] > 0 else '∞' active = '✓' if u['is_active'] else '✗' tier_display = u['tier'].upper() rem_days = get_remaining_days(u) users_rows += f''' {u['id']} {u['username']} {u['subscription_id']} {tier_display} {rem_days} {u['total_paid_rubles']}₽ {traffic} {active} {u['created_at'][:10]} ''' html = render_template("admin_users.html", title=title, user_count=len(users), users_rows=users_rows, tier_options=tier_options) return HTMLResponse(content=html) @app.get("/admin/dashboard") async def admin_dashboard(request: Request): if not check_admin(request): return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg="")) conn = get_db() try: total_users = conn.execute("SELECT COUNT(*) as c FROM users").fetchone()["c"] free_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='free'").fetchone()["c"] paid_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='paid'").fetchone()["c"] test_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='test'").fetchone()["c"] total_revenue = conn.execute("SELECT SUM(total_paid_rubles) as s FROM users").fetchone()["s"] or 0 total_donations = conn.execute("SELECT SUM(total_paid_rubles) as s FROM users WHERE total_paid_rubles > 0").fetchone()["s"] or 0 finally: conn.close() online_count = 0 for srv in servers: for inbound in srv.get("inbounds", []): try: api = Api(host=inbound["api_host"], username=inbound["api_user"], password=inbound["api_pass"], use_tls_verify=False) api.login() online = api.client.online() online_count += len(online) except: pass title = settings.get("general", {}).get("title", "ZernProxy") servers_rows = "".join( f'
{get_flag_emoji(s.get("country",""))} {s["name"].upper()}{len(s.get("inbounds",[]))} inbound
' for s in servers) html = render_template("admin_dashboard.html", title=title, total_users=total_users, free_users=free_users, paid_users=paid_users, test_users=test_users, online_count=online_count, total_revenue=total_revenue, server_count=len(servers), inbound_count=sum(len(s.get("inbounds",[])) for s in servers), servers_rows=servers_rows) return HTMLResponse(content=html) @app.get("/") async def home_page(): title = settings.get("general", {}).get("title", "ZernProxy") da_url = settings.get("payments", {}).get("donationalerts", {}).get("url", "") statuses = await fetch_servers_status() srv_cards = "" for i, s in enumerate(statuses): chk = s.get("checks", {}) cpu = chk.get("CPU", {}).get("value") ram = chk.get("RAM", {}).get("value") disk = chk.get("Disk /", {}).get("value") srv_name = s.get("server_name", s["name"].upper()) delay = 0.45 + i * 0.15 online = s.get("online", False) srv_inbounds = [] for srv_cfg in servers: if srv_cfg["name"] == s["name"]: srv_inbounds = srv_cfg.get("inbounds", []) all_ib = srv_inbounds has_free = any(ib.get("is_free", True) for ib in all_ib) has_paid = any(not ib.get("is_free", True) for ib in all_ib) break else: has_free = True has_paid = True if has_free and has_paid: badge = 'Free+Premium' elif has_free: badge = 'Free' else: badge = 'Premium' if online: _, hc, hl = calc_health_score(chk) s1 = cpu if cpu is not None else 0 s2 = ram if ram is not None else 0 s3 = disk if disk is not None else 0 stats = f'
CPU {s1:.0f}% · RAM {s2:.0f}% · DSK {s3:.0f}%
' else: hc = "#475569" hl = "Offline" stats = '
Offline
' inbound_cards = "" for ib in srv_inbounds: ib_name = ib.get("name", "unknown") ib_type = ib.get("network", ib.get("flow", "")) inbound_cards += f'
{ib_name}{ib_type}
\n' ru_note = "" if s.get("country", "").lower() == "ru" and online: ru_note = '
★ Нет рекламы в YouTube · Адаптирован к будущему ограничению 15 ГБ зарубежного трафика через Proxy
' srv_cards += f'''
{get_flag_emoji(s.get("country",""))} {srv_name} {badge} {hl}
{stats}
{inbound_cards}
{ru_note}
''' return HTMLResponse(content=render_template("home.html", title=title, da_url=da_url, srv_cards=srv_cards)) async def fetch_servers_status() -> list: results = [] async with httpx.AsyncClient(timeout=8.0, verify=False) as client: for srv in servers: url = srv.get("status_url", "") entry = {"name": srv["name"], "country": srv.get("country",""), "checks": {}, "online": False, "uptime": ""} if not url: results.append(entry) continue try: resp = await client.get(url) if resp.status_code == 200: data = resp.json() entry["online"] = True entry["server_name"] = data.get("server_name", "") entry["checks"] = data.get("checks", {}) else: entry["checks"]["error"] = {"value": f"HTTP {resp.status_code}"} except Exception as e: entry["checks"]["error"] = {"value": str(e)[:50]} results.append(entry) return results @app.get("/webhook/tg") async def tg_webhook(): bot_cfg = settings.get("bot", {}) if bot_cfg.get("enabled"): return JSONResponse({"status": "ok", "bot": True, "message": "Telegram bot is ready"}) return JSONResponse({"status": "ok", "bot": False, "message": "Telegram bot not configured"}) def create_3xui_client(username: str, sub_id: str, inbound: dict, traffic_gb: int = 0) -> dict: """Создаёт клиента на 3x-UI сервере""" api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") if not all([api_host, api_user, api_pass, inbound_id]): return {"success": False, "error": "missing_credentials"} try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() inbound_name = inbound.get('name', 'default') email = f"{username}_{inbound_name}@vless.local" total_bytes = traffic_gb * 1073741824 if traffic_gb > 0 else 0 logger.info(f"Creating client: email={email}, total_bytes={total_bytes}, inbound={inbound_name}") from py3xui.client import Client if inbound.get("protocol") == "trojan": client = Client( id="", password=str(uuid.uuid4()), email=email, enable=True, total_gb=total_bytes, expiry_time=0, limit_ip=0, subId=sub_id ) else: client = Client( id=str(uuid.uuid4()), email=email, enable=True, total_gb=total_bytes, expiry_time=0, limit_ip=0, subId=sub_id ) try: existing = api.client.get_by_email(email) if existing: logger.info(f"Deleting existing client: {existing.id}") api.client.delete(existing.id, inbound_id) except: pass logger.info(f"About to add client: {client}") api.client.add(inbound_id=inbound_id, clients=[client]) logger.info(f"Client created successfully on {inbound_name}") return {"success": True, "email": email} except Exception as e: logger.error(f"Error creating client: {e}") import traceback logger.error(traceback.format_exc()) return {"success": False, "error": str(e)[:150]} def delete_3xui_client(username: str, sub_id: str, inbound: dict) -> dict: """Удаляет клиента с 3x-UI сервера""" api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") if not all([api_host, api_user, api_pass, inbound_id]): return {"success": False, "error": "missing_credentials"} try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() inbounds = api.inbound.get_list() for ib in inbounds: if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: if getattr(client, 'sub_id', '') == sub_id: api.client.delete(inbound_id, client.uuid) logger.info(f"Deleted client from {inbound.get('name')}") return {"success": True} return {"success": True, "error": "not_found"} except Exception as e: return {"success": False, "error": str(e)[:100]} def fetch_server_sub_ids(srv: dict) -> dict: result = {} srv_name = srv.get("name", "?") for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") ib_name = inbound.get("name", "?") if not all([api_host, api_user, api_pass, inbound_id]): continue try: from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout def _fetch(): api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() ids = set() for ib in api.inbound.get_list(): if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: sid = getattr(client, 'sub_id', '') or '' if sid: ids.add(sid) return ids with ThreadPoolExecutor(max_workers=1) as ex: future = ex.submit(_fetch) try: fetched = future.result(timeout=20) result[inbound_id] = fetched logger.info(f"fetch_server_sub_ids {srv_name}/{ib_name}: got {len(fetched)} sub_ids") except FutureTimeout: logger.warning(f"Timeout fetching clients from {srv_name}/{ib_name}") except Exception as e: logger.warning(f"fetch_server_sub_ids {srv_name}/{ib_name} error: {e}") logger.info(f"fetch_server_sub_ids {srv_name}: inbound count {len(result)}") return result def propagate_server_sync(server_name: str) -> dict: target_srv = next((s for s in servers if s["name"] == server_name), None) if not target_srv: return {"error": "Server not found"} conn = get_db() try: users = conn.execute("SELECT username, subscription_id, traffic_limit_gb, is_active FROM users WHERE is_active = 1").fetchall() finally: conn.close() other_servers = [s for s in servers if s.get("is_active", True) and s["name"] != server_name] threshold = max(1, len(other_servers) // 2 + 1) target_ib_ids = fetch_server_sub_ids(target_srv) other_ids = {} for s in other_servers: ib_dict = fetch_server_sub_ids(s) flat = set() for ids in ib_dict.values(): flat.update(ids) other_ids[s["name"]] = flat total = len(users) added = skipped = failed = 0 inbound_stats = {} for ib in target_srv.get("inbounds", []): inbound_stats[ib.get("name", f"id_{ib.get('id','?')}")] = 0 results = [] for u in users: sub_id = u["subscription_id"] username = u["username"] count = sum(1 for s in other_servers if sub_id in other_ids.get(s["name"], set())) if count < threshold: skipped += 1 results.append({"username": username, "skipped": True, "servers": count, "total": len(other_servers)}) continue user_added = False for inbound in target_srv.get("inbounds", []): ib_id = inbound.get("id") ib_name = inbound.get("name", "?") existing = target_ib_ids.get(ib_id, set()) if sub_id in existing: continue r = create_3xui_client(username, sub_id, inbound, u["traffic_limit_gb"] or 0) if r.get("success"): user_added = True inbound_stats[ib_name] = inbound_stats.get(ib_name, 0) + 1 clear_cache(sub_id) else: failed += 1 results.append({"username": username, "inbound": ib_name, "error": r.get("error", "")}) if user_added: added += 1 results.append({"username": username, "added": True}) already = sum(1 for u in users if u["subscription_id"] in {sid for ids in target_ib_ids.values() for sid in ids}) return {"server": server_name, "total": total, "already_on_server": already, "added": added, "skipped": skipped, "failed": failed, "threshold": f"{threshold}/{len(other_servers)}", "per_inbound_added": inbound_stats, "results": results} @app.post("/admin/api/propagate/{server_name}") async def propagate_server(request: Request, server_name: str): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) result = propagate_server_sync(server_name) if "error" in result: return JSONResponse(result, status_code=404) return JSONResponse(result) @app.post("/admin/api/users") async def create_user(request: Request, data: dict): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) username = data.get("username", "").strip() if not username: return JSONResponse({"error": "username required"}, status_code=400) sub_id = generate_sub_id() conn = get_db() traffic_gb = int(data.get("traffic_limit_gb", 0) or 0) logger.info(f"Creating user: username={username}, traffic_gb={traffic_gb}") results = [] for srv in servers: srv_result = {"server": srv["name"], "inbounds": []} for inbound in srv.get("inbounds", []): result = create_3xui_client(username, sub_id, inbound, traffic_gb) srv_result["inbounds"].append({ "name": inbound.get("name"), "success": result["success"], "error": result.get("error", "") }) results.append(srv_result) try: conn.execute(""" INSERT INTO users (username, subscription_id, tier, tariff_days_bought, tariff_days_remaining, total_paid_rubles, traffic_limit_gb, is_active) VALUES (?, ?, 'free', 0, 0, 0, 0, 1) """, (username, sub_id)) conn.commit() except sqlite3.IntegrityError: conn.close() return JSONResponse({"error": "username exists"}, status_code=400) finally: conn.close() success_count = sum(1 for r in results for ib in r["inbounds"] if ib["success"]) total_count = sum(len(r["inbounds"]) for r in results) return JSONResponse({ "status": "ok", "username": username, "subscription_id": sub_id, "subscription_url": f"https://{settings.get('general', {}).get('host', 'conn.zernmc.ru')}/sub/{sub_id}", "results": results, "summary": f"{success_count}/{total_count} инбаундов создано" }) @app.post("/admin/api/users/update") async def update_user(request: Request, data: dict): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) logger.info(f"RAW data received: {data}") user_id = int(data.get("id", 0)) tier = str(data.get("tier", "free")) tariff_days_remaining = int(data.get("tariff_days_remaining", 0) or 0) tariff_end_at = int(time.time()) + tariff_days_remaining * 86400 if tariff_days_remaining > 0 else 0 traffic_limit_gb = int(data.get("traffic_limit_gb", 0) or 0) is_active = 1 if data.get("is_active") in [True, "true", "on", "1", 1] else 0 logger.info(f"Update user: id={user_id}, tier='{tier}', days={tariff_days_remaining}, traffic={traffic_limit_gb}, active={is_active}") conn = get_db() try: conn.execute(""" UPDATE users SET tier = ?, tariff_days_remaining = ?, tariff_end_at = ?, traffic_limit_gb = ?, is_active = ? WHERE id = ? """, (tier, tariff_days_remaining, tariff_end_at, traffic_limit_gb, is_active, user_id)) conn.commit() updated = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() logger.info(f"After update DB: {dict(updated)}") finally: conn.close() if updated: updated = dict(updated) updated["tier"] = tier clear_cache(updated["subscription_id"]) asyncio.create_task(sync_wdtt_password(updated)) return JSONResponse({"status": "ok"}) @app.post("/admin/api/users/delete") async def delete_user(request: Request, data: dict): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) user_id = data.get("id") username = data.get("username", "") conn = get_db() try: user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if not user: return JSONResponse({"error": "user not found"}, status_code=404) user = dict(user) sub_id = user["subscription_id"] for srv in servers: for inbound in srv.get("inbounds", []): result = delete_3xui_client(username, sub_id, inbound) asyncio.create_task(remove_wdtt_password(user)) conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() finally: conn.close() return JSONResponse({"status": "ok", "message": "User deleted from all servers"}) async def rotate_shortids(): while True: try: rotation_hours = settings.get("shortid_rotation_hours", 11) await asyncio.sleep(rotation_hours * 3600) for srv in servers: for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") if not all([api_host, api_user, api_pass]): continue try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() new_shortid = secrets.token_hex(4) inbounds = api.inbound.get_inbounds() for ib in inbounds: if ib.id == inbound.get("id"): ib.shortid = new_shortid api.inbound.update_inbound(ib) logger.info(f"ShortID rotated for {srv['name']}/{inbound['name']}: {new_shortid}") break except Exception as e: logger.error(f"ShortID rotation error {srv['name']}/{inbound['name']}: {e}") except Exception as e: logger.error(f"ShortID rotation error: {e}") await asyncio.sleep(3600) def update_3xui_expiry(sub_id: str, days: int): """Обновляет expiry_time на всех 3x-UI серверах""" import time expiry_timestamp = int(time.time() + (days * 86400)) for srv in servers: for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") api_user = inbound.get("api_user") api_pass = inbound.get("api_pass") inbound_id = inbound.get("id") if not all([api_host, api_user, api_pass, inbound_id]): continue try: api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() inbounds = api.inbound.get_list() for ib in inbounds: if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: if getattr(client, 'sub_id', '') == sub_id: client.expiry_time = expiry_timestamp api.client.update(client.uuid, client) logger.info(f"Updated expiry for {srv['name']}/{inbound['name']}: {days} days") break except Exception as e: logger.error(f"Error updating expiry {srv['name']}/{inbound['name']}: {e}") async def poll_donationalerts(): global last_donation_id while True: try: da_config = settings.get("payments", {}).get("donationalerts", {}) if not da_config.get("enabled"): await asyncio.sleep(300) continue api_token = da_config.get("api_token", "") if not api_token: logger.warning("DonationAlerts API token not configured") await asyncio.sleep(300) continue interval = da_config.get("check_interval_minutes", 5) await asyncio.sleep(interval * 60) async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.get( "https://www.donationalerts.com/api/v1/alerts/donations", headers={"Authorization": f"Bearer {api_token}"} ) if resp.status_code != 200: logger.error(f"DA API error: {resp.status_code}") continue data = resp.json() donations = data.get("data", []) for donation in reversed(donations): donation_id = donation.get("id") if donation_id <= last_donation_id: continue amount = donation.get("amount", 0) username = donation.get("username", "") message = donation.get("message", "") tiers_config = settings.get("tiers", {}) tier = None days = 0 for tier_name, tier_data in tiers_config.items(): prices = tier_data.get("prices", {}) tier_days = tier_data.get("days", {}) for price_key, price_val in prices.items(): if amount == price_val: tier = tier_name days = tier_days.get(price_key, 30) break if tier: break if not tier: logger.info(f"DA: ignoring amount {amount} RUB (not in config)") last_donation_id = donation_id continue user = None message_parts = message.split() if message else [] conn = get_db() try: for part in message_parts: if part.isdigit(): user = conn.execute("SELECT * FROM users WHERE id = ?", (int(part),)).fetchone() if not user: user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (part,)).fetchone() if user: break if not user and username: user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (username,)).fetchone() if user: now_ts = int(time.time()) current_end = user.get("tariff_end_at", 0) if current_end > now_ts: new_end = current_end + days * 86400 else: new_end = now_ts + days * 86400 conn.execute(""" UPDATE users SET tier = ?, tariff_days_bought = tariff_days_bought + ?, tariff_end_at = ?, total_paid_rubles = total_paid_rubles + ? WHERE id = ? """, (tier, days, new_end, amount, user["id"])) conn.commit() sub_id = user["subscription_id"] update_3xui_expiry(sub_id, days) updated = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() if updated: asyncio.create_task(sync_wdtt_password(dict(updated))) logger.info(f"VPN payment: {username} paid {amount} RUB, tier={tier}, +{days} days") finally: conn.close() last_donation_id = donation_id except Exception as e: logger.error(f"DonationAlerts polling error: {e}") await asyncio.sleep(300) @app.get("/health") async def health(): return {"status": "ok", "servers": len(servers), "settings_loaded": bool(settings)} @app.post("/admin/api/reload") async def reload_configs(request: Request): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) load_configs() clear_cache() return {"status": "ok"} @app.get("/admin/api/rotate-shortids") async def manual_rotate(request: Request): if not check_admin(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) await rotate_shortids() return {"status": "ok"} if __name__ == "__main__": loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(rotate_shortids()) loop.create_task(poll_donationalerts()) uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")