1452 lines
57 KiB
Python
1452 lines
57 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ZernProxy Manager v22 - Soft-Tariff with SQLite
|
|
"""
|
|
|
|
import json
|
|
import base64
|
|
import logging
|
|
import asyncio
|
|
import io
|
|
import os
|
|
import qrcode
|
|
import uuid
|
|
import hashlib
|
|
import secrets
|
|
import sqlite3
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from qrcode.image.styledpil import StyledPilImage
|
|
from qrcode.image.styles.moduledrawers import SquareModuleDrawer
|
|
from typing import List, Dict, Optional, Tuple
|
|
from contextlib import contextmanager, asynccontextmanager
|
|
from fastapi import FastAPI, Response, HTTPException, Query, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
import uvicorn
|
|
import httpx
|
|
import re
|
|
from py3xui import Api
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("zernproxy")
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SERVERS_CONF = os.path.join(BASE_DIR, "servers.conf")
|
|
SETTINGS_CONF = os.path.join(BASE_DIR, "settings.conf")
|
|
USERS_DB = os.path.join(BASE_DIR, "users.db")
|
|
LOGO_PATH = os.path.join(BASE_DIR, "logo.png")
|
|
MOTD_PATH = os.path.join(BASE_DIR, "motd.txt")
|
|
WEB_DIR = os.path.join(BASE_DIR, "web")
|
|
|
|
servers = []
|
|
settings = {}
|
|
last_donation_id = 0
|
|
|
|
# In-memory cache
|
|
_links_cache: Dict[str, Tuple[List[str], float]] = {}
|
|
_traffic_cache: Dict[str, Tuple[Dict, float]] = {}
|
|
CACHE_TTL_TRAFFIC = 120
|
|
CACHE_TTL_LINKS = 86400 * 7
|
|
|
|
def _get_cached(key: str, cache: dict, ttl: float):
|
|
entry = cache.get(key)
|
|
if entry and time.time() - entry[1] < ttl:
|
|
return entry[0]
|
|
return None
|
|
|
|
def _set_cache(key: str, value, cache: dict):
|
|
cache[key] = (value, time.time())
|
|
|
|
def get_cached_links(sub_id: str):
|
|
return _get_cached(sub_id, _links_cache, CACHE_TTL_LINKS)
|
|
|
|
def set_cached_links(sub_id: str, links: List[str]):
|
|
_set_cache(sub_id, links, _links_cache)
|
|
|
|
def get_cached_traffic(sub_id: str):
|
|
return _get_cached(sub_id, _traffic_cache, CACHE_TTL_TRAFFIC)
|
|
|
|
def set_cached_traffic(sub_id: str, data: dict):
|
|
_set_cache(sub_id, data, _traffic_cache)
|
|
|
|
def clear_cache(sub_id: str = None):
|
|
if sub_id:
|
|
_links_cache.pop(sub_id, None)
|
|
_traffic_cache.pop(sub_id, None)
|
|
else:
|
|
_links_cache.clear()
|
|
_traffic_cache.clear()
|
|
|
|
def get_remaining_days(user) -> int:
|
|
if not isinstance(user, dict):
|
|
user = dict(user)
|
|
end = user.get("tariff_end_at", 0)
|
|
if end:
|
|
return max(0, (end - int(time.time())) // 86400)
|
|
return user.get("tariff_days_remaining", 0)
|
|
|
|
def load_json(path: str, default: dict) -> dict:
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error loading {path}: {e}")
|
|
return default
|
|
|
|
def save_json(path: str, data: dict):
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=4, ensure_ascii=False)
|
|
except Exception as e:
|
|
logger.error(f"Error saving {path}: {e}")
|
|
|
|
def init_db():
|
|
conn = sqlite3.connect(USERS_DB)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
subscription_id TEXT UNIQUE NOT NULL,
|
|
tier TEXT DEFAULT 'free',
|
|
tariff_days_bought INTEGER DEFAULT 0,
|
|
tariff_days_remaining INTEGER DEFAULT 0,
|
|
tariff_end_at INTEGER DEFAULT 0,
|
|
total_paid_rubles INTEGER DEFAULT 0,
|
|
traffic_limit_gb INTEGER DEFAULT 0,
|
|
is_active BOOLEAN DEFAULT 1,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
try:
|
|
conn.execute("ALTER TABLE users ADD COLUMN tariff_end_at INTEGER DEFAULT 0")
|
|
except:
|
|
pass
|
|
now = int(time.time())
|
|
conn.execute("UPDATE users SET tariff_end_at = ? + tariff_days_remaining * 86400 WHERE tariff_end_at = 0 AND tariff_days_remaining > 0", (now,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(USERS_DB, timeout=30.0)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def load_configs():
|
|
global servers, settings
|
|
servers = load_json(SERVERS_CONF, {"servers": []}).get("servers", [])
|
|
settings = load_json(SETTINGS_CONF, {
|
|
"general": {"title": "ZernProxy", "host": "conn.zernmc.ru", "support_url": ""},
|
|
"announcement": "",
|
|
"shortid_rotation_hours": 11,
|
|
"tiers": {
|
|
"free": {"name": "Free", "servers": [], "traffic_limit_gb": 0},
|
|
"paid": {"name": "Premium", "servers": [], "traffic_limit_gb": 0}
|
|
},
|
|
"payments": {"donationalerts": {"enabled": False, "api_token": "", "webhook_secret": "", "check_interval_minutes": 5}},
|
|
"auto_propagate": {"enabled": False, "interval_minutes": 60}
|
|
})
|
|
logger.info(f"Loaded {len(servers)} servers")
|
|
|
|
init_db()
|
|
load_configs()
|
|
|
|
def render_template(name: str, **kwargs) -> str:
|
|
path = os.path.join(WEB_DIR, name)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
html = f.read()
|
|
except FileNotFoundError:
|
|
logger.error(f"Template not found: {path}")
|
|
return "Template error"
|
|
for k, v in kwargs.items():
|
|
html = html.replace(f"{{%{k}%}}", str(v))
|
|
return html
|
|
|
|
def get_flag_emoji(country_code: str) -> str:
|
|
if not country_code or len(country_code) < 2:
|
|
return ""
|
|
try:
|
|
return chr(ord(country_code[0].upper()) + 127397) + chr(ord(country_code[1].upper()) + 127397)
|
|
except:
|
|
return ""
|
|
|
|
def generate_qr_base64(data: str, size: int = 300) -> str:
|
|
qr = qrcode.QRCode(version=None, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=2)
|
|
qr.add_data(data)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color="black", back_color="white", image_factory=StyledPilImage, module_drawer=SquareModuleDrawer())
|
|
img = img.resize((size, size), resample=0)
|
|
buffered = io.BytesIO()
|
|
img.save(buffered, format="PNG")
|
|
return base64.b64encode(buffered.getvalue()).decode()
|
|
|
|
def get_logo_base64() -> Optional[str]:
|
|
if os.path.exists(LOGO_PATH):
|
|
try:
|
|
with open(LOGO_PATH, "rb") as f:
|
|
return base64.b64encode(f.read()).decode()
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def get_motd() -> str:
|
|
if os.path.exists(MOTD_PATH):
|
|
try:
|
|
with open(MOTD_PATH, "r", encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return ""
|
|
|
|
def format_bytes(bytes_val: int) -> str:
|
|
if bytes_val == 0:
|
|
return "0 B"
|
|
sizes = ["B", "KB", "MB", "GB", "TB"]
|
|
i = 0
|
|
val = float(bytes_val)
|
|
while val >= 1024 and i < len(sizes) - 1:
|
|
val /= 1024
|
|
i += 1
|
|
return f"{val:.1f} {sizes[i]}" if val < 100 else f"{int(val)} {sizes[i]}"
|
|
|
|
def _parse_throughput(s: str) -> float:
|
|
if not s:
|
|
return 0.0
|
|
total_bytes = 0.0
|
|
for m in re.finditer(r'([\d.]+)\s*([KMGT])?B/s', s):
|
|
val = float(m.group(1))
|
|
unit = m.group(2)
|
|
if unit == 'K': val *= 1024
|
|
elif unit == 'M': val *= 1024**2
|
|
elif unit == 'G': val *= 1024**3
|
|
elif unit == 'T': val *= 1024**4
|
|
total_bytes += val
|
|
return total_bytes / 1024
|
|
|
|
def calc_health_score(checks: dict) -> tuple:
|
|
cpu = checks.get("CPU", {}).get("value")
|
|
ram = checks.get("RAM", {}).get("value")
|
|
disk = checks.get("Disk /", {}).get("value")
|
|
net_raw = checks.get("Net ↓↑", {}).get("value", "")
|
|
io_raw = checks.get("Disk I/O", {}).get("value", "")
|
|
|
|
cpu_s = cpu if cpu is not None else 50
|
|
ram_s = ram if ram is not None else 50
|
|
disk_s = disk if disk is not None else 50
|
|
|
|
net_kbs = _parse_throughput(net_raw)
|
|
io_kbs = _parse_throughput(io_raw)
|
|
|
|
if net_kbs < 500:
|
|
net_s = 0
|
|
elif net_kbs < 10000:
|
|
net_s = (net_kbs - 500) / 9500 * 50
|
|
elif net_kbs < 100000:
|
|
net_s = 50 + (net_kbs - 10000) / 90000 * 30
|
|
else:
|
|
net_s = min(100, 80 + (net_kbs - 100000) / 100000 * 20)
|
|
|
|
if io_kbs < 100:
|
|
io_s = 0
|
|
elif io_kbs < 10000:
|
|
io_s = (io_kbs - 100) / 9900 * 50
|
|
elif io_kbs < 100000:
|
|
io_s = 50 + (io_kbs - 10000) / 90000 * 30
|
|
else:
|
|
io_s = min(100, 80 + (io_kbs - 100000) / 100000 * 20)
|
|
|
|
score = cpu_s * 0.30 + ram_s * 0.25 + disk_s * 0.20 + net_s * 0.15 + io_s * 0.10
|
|
|
|
if score < 40:
|
|
return (round(score), "#10b981", "Отлично")
|
|
elif score < 70:
|
|
return (round(score), "#f59e0b", "Нагружен")
|
|
else:
|
|
return (round(score), "#f43f5e", "Критично")
|
|
|
|
def get_traffic_stats(sub_id: str) -> dict:
|
|
total_up = 0
|
|
total_down = 0
|
|
by_server = {}
|
|
|
|
for srv in servers:
|
|
srv_up = 0
|
|
srv_down = 0
|
|
for inbound in srv.get("inbounds", []):
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
continue
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
inbounds = api.inbound.get_list()
|
|
for ib in inbounds:
|
|
if ib.id == inbound_id and ib.client_stats:
|
|
for client in ib.client_stats:
|
|
if getattr(client, 'sub_id', '') == sub_id:
|
|
srv_up += client.up or 0
|
|
srv_down += client.down or 0
|
|
except:
|
|
pass
|
|
|
|
if srv_up or srv_down:
|
|
by_server[srv["name"]] = {"up": srv_up, "down": srv_down}
|
|
total_up += srv_up
|
|
total_down += srv_down
|
|
|
|
return {"total_up": total_up, "total_down": total_down, "by_server": by_server}
|
|
|
|
def generate_sub_id(length: int = 16) -> str:
|
|
return ''.join(secrets.choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(length))
|
|
|
|
async def fetch_vless_links(url: str) -> List[str]:
|
|
async with httpx.AsyncClient(verify=False, timeout=10.0, follow_redirects=True) as client:
|
|
try:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 404 or resp.status_code == 400:
|
|
logger.warning(f"Sub not found on server: {url}")
|
|
return []
|
|
if resp.status_code == 200:
|
|
content = resp.text.strip()
|
|
try:
|
|
decoded = base64.b64decode(content).decode('utf-8')
|
|
links = re.findall(r'([a-z]+://[^\s\n]+)', decoded)
|
|
if links:
|
|
return links
|
|
except:
|
|
pass
|
|
return re.findall(r'([a-z]+://[^\s\n]+)', content)
|
|
except Exception as e:
|
|
logger.error(f"Fetch error: {e}")
|
|
return []
|
|
|
|
def find_user_on_servers(sub_id: str) -> Optional[str]:
|
|
"""Ищет юзера на 3x-UI серверах по subId, возвращает username"""
|
|
logger.info(f"Looking for sub_id: {sub_id}")
|
|
for srv in servers:
|
|
for inbound in srv.get("inbounds", []):
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
continue
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
inbounds = api.inbound.get_list()
|
|
for ib in inbounds:
|
|
if ib.id == inbound_id and ib.client_stats:
|
|
for client in ib.client_stats:
|
|
client_subid = getattr(client, 'sub_id', '') or ''
|
|
logger.info(f"Found client: {client.email} sub_id: {client_subid}")
|
|
if client_subid == sub_id:
|
|
email = client.email or ""
|
|
if '@' in email:
|
|
username = email.split('@')[0]
|
|
if '_' in username:
|
|
username = username.split('_')[0]
|
|
logger.info(f"Migrated user: {username} (found on {srv['name']}/{inbound['name']})")
|
|
return username
|
|
except Exception as e:
|
|
logger.error(f"Error checking {srv['name']}/{inbound['name']}: {e}")
|
|
|
|
return None
|
|
|
|
def get_user_tier(sub_id: str) -> Tuple[Optional[dict], str]:
|
|
conn = get_db()
|
|
try:
|
|
user = conn.execute("SELECT * FROM users WHERE subscription_id = ?", (sub_id,)).fetchone()
|
|
|
|
if not user:
|
|
username = find_user_on_servers(sub_id)
|
|
if username:
|
|
try:
|
|
conn.execute("""
|
|
INSERT INTO users (username, subscription_id, tier, tariff_days_bought, tariff_days_remaining, total_paid_rubles, traffic_limit_gb, is_active)
|
|
VALUES (?, ?, 'free', 0, 0, 0, 0, 1)
|
|
""", (username, sub_id))
|
|
conn.commit()
|
|
user = conn.execute("SELECT * FROM users WHERE subscription_id = ?", (sub_id,)).fetchone()
|
|
logger.info(f"User migrated: {username}")
|
|
except sqlite3.IntegrityError:
|
|
pass
|
|
|
|
if not user:
|
|
return None, "free"
|
|
|
|
user = dict(user)
|
|
|
|
if user['tier'] == 'paid' and get_remaining_days(user) <= 0:
|
|
conn.execute("UPDATE users SET tier = 'free' WHERE id = ?", (user['id'],))
|
|
conn.commit()
|
|
user['tier'] = 'free'
|
|
|
|
return user, user['tier']
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_servers_for_tier(tier: str) -> List[dict]:
|
|
result = []
|
|
for srv in servers:
|
|
if not srv.get("is_active"):
|
|
continue
|
|
result.append(srv)
|
|
return result
|
|
|
|
def deduplicate_inbounds(servers_list: List[dict], tier: str) -> List[Tuple[dict, dict]]:
|
|
seen = set()
|
|
result = []
|
|
for srv in servers_list:
|
|
for inbound in srv.get("inbounds", []):
|
|
if tier == "free" and not inbound.get("is_free", True):
|
|
continue
|
|
|
|
key = (srv["name"], inbound.get("name", ""))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
result.append((srv, inbound))
|
|
return result
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
ap_cfg = settings.get("auto_propagate", {})
|
|
if ap_cfg.get("enabled", False):
|
|
interval = max(10, ap_cfg.get("interval_minutes", 60)) * 60
|
|
logger.info(f"Auto-propagate enabled, interval={interval}s")
|
|
loop = asyncio.get_event_loop()
|
|
async def _run():
|
|
while True:
|
|
for srv in servers:
|
|
if not srv.get("is_active", True):
|
|
continue
|
|
try:
|
|
result = await loop.run_in_executor(None, propagate_server_sync, srv["name"])
|
|
if result.get("added", 0) or result.get("failed", 0):
|
|
logger.info(f"Auto-propagate {srv['name']}: +{result.get('added',0)} added, {result.get('failed',0)} failed, {result.get('skipped',0)} skipped")
|
|
except:
|
|
logger.exception(f"Auto-propagate error on {srv.get('name','?')}")
|
|
await asyncio.sleep(interval)
|
|
task = asyncio.create_task(_run())
|
|
yield
|
|
if ap_cfg.get("enabled", False):
|
|
task.cancel()
|
|
|
|
app = FastAPI(title="ZernProxy Manager", docs_url=None, redoc_url=None, lifespan=lifespan)
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
app.mount("/web", StaticFiles(directory=WEB_DIR), name="web")
|
|
|
|
ADMIN_USER = settings.get("admin", {}).get("username", "admin")
|
|
ADMIN_PASS = settings.get("admin", {}).get("password", "")
|
|
|
|
def check_admin(request: Request) -> bool:
|
|
if not ADMIN_PASS:
|
|
return True
|
|
token = request.cookies.get("admin_token", "")
|
|
if token == ADMIN_PASS:
|
|
return True
|
|
auth = request.headers.get("Authorization", "")
|
|
if auth.startswith("Basic "):
|
|
try:
|
|
decoded = base64.b64decode(auth[6:]).decode()
|
|
u, p = decoded.split(":", 1)
|
|
if u == ADMIN_USER and p == ADMIN_PASS:
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
@app.get("/admin/login")
|
|
async def admin_login_page():
|
|
return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg=""))
|
|
|
|
@app.post("/admin/login")
|
|
async def admin_login(request: Request):
|
|
data = await request.form()
|
|
if data.get("username") == ADMIN_USER and data.get("password") == ADMIN_PASS:
|
|
resp = RedirectResponse(url="/admin/users", status_code=303)
|
|
resp.set_cookie(key="admin_token", value=ADMIN_PASS, max_age=86400 * 7, httponly=True)
|
|
return resp
|
|
return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg='<div class="login-err">Неверный пароль</div>'), status_code=401)
|
|
|
|
@app.get("/sub/{subscription_id}")
|
|
async def get_subscription(request: Request, subscription_id: str, format: str = Query("base64", pattern="^(json|base64|raw)$")):
|
|
accept = request.headers.get("accept", "")
|
|
if "text/html" in accept and format == "base64":
|
|
return await get_web_page(subscription_id)
|
|
|
|
user, tier = get_user_tier(subscription_id)
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
|
|
if not user.get("is_active"):
|
|
raise HTTPException(403, "User is disabled")
|
|
|
|
tier_config = settings.get("tiers", {}).get(tier, {})
|
|
servers_for_tier = get_servers_for_tier(tier)
|
|
inbounds = deduplicate_inbounds(servers_for_tier, tier)
|
|
|
|
if not inbounds:
|
|
raise HTTPException(404, "No servers available")
|
|
|
|
all_links = get_cached_links(subscription_id)
|
|
if all_links is None:
|
|
all_links = []
|
|
seen_links = set()
|
|
servers_processed = set()
|
|
server_links = {}
|
|
|
|
for srv, inbound in inbounds:
|
|
srv_name = srv["name"]
|
|
|
|
if srv_name not in servers_processed:
|
|
servers_processed.add(srv_name)
|
|
sub_path = srv["sub_path"].format(sub_id=subscription_id)
|
|
url = f"{srv['subscription_url'].rstrip('/')}{sub_path}"
|
|
server_links[srv_name] = await fetch_vless_links(url)
|
|
|
|
links = server_links.get(srv_name, [])
|
|
srv_inbounds = [ib for s, ib in inbounds if s["name"] == srv_name]
|
|
try:
|
|
link_idx = srv_inbounds.index(inbound)
|
|
except ValueError:
|
|
continue
|
|
|
|
if link_idx >= len(links):
|
|
continue
|
|
|
|
link = links[link_idx]
|
|
clean_link = link.split('#')[0]
|
|
if clean_link in seen_links:
|
|
continue
|
|
seen_links.add(clean_link)
|
|
|
|
remark = f"{get_flag_emoji(srv.get('country', ''))} {srv_name.upper()} ({inbound['name']})"
|
|
all_links.append(f"{clean_link}#{remark}")
|
|
|
|
set_cached_links(subscription_id, all_links)
|
|
|
|
if not all_links:
|
|
raise HTTPException(404, "No links found")
|
|
|
|
if format == "json":
|
|
return {"links": all_links, "count": len(all_links), "tier": tier}
|
|
|
|
lines = []
|
|
motd_text = get_motd()
|
|
announce_header = ""
|
|
|
|
if motd_text:
|
|
announce_header = f"base64:{base64.b64encode(motd_text.encode()).decode()}"
|
|
elif settings.get("announcement"):
|
|
announce_header = f"base64:{base64.b64encode(settings['announcement'].encode()).decode()}"
|
|
|
|
host = settings.get("general", {}).get("host", "conn.zernmc.ru")
|
|
title = settings.get("general", {}).get("title", "ZernProxy")
|
|
support_url = settings.get("general", {}).get("support_url", "")
|
|
update_interval = 12
|
|
|
|
web_url = f"https://{host}/sub/{subscription_id}"
|
|
lines.append(f"#profile-web-page-url: {web_url}")
|
|
lines.append(f"#profile-title: {title}")
|
|
lines.append(f"#profile-update-interval: {update_interval}")
|
|
lines.append("#hide-settings: 1")
|
|
if support_url:
|
|
lines.append(f"#support-url: {support_url}")
|
|
|
|
expire_ts = user.get("tariff_end_at", 0) or 0
|
|
rem_days = get_remaining_days(user)
|
|
if rem_days > 0:
|
|
if not expire_ts:
|
|
expire_ts = int((datetime.now() + timedelta(days=rem_days)).timestamp())
|
|
lines.append("#sub-expire: 1")
|
|
if support_url:
|
|
lines.append(f"#sub-expire-button-link: {support_url}")
|
|
|
|
traffic_limit = user.get("traffic_limit_gb") or tier_config.get("traffic_limit_gb", 0)
|
|
traffic_limit_bytes = traffic_limit * 1073741824 if traffic_limit > 0 else 0
|
|
|
|
traffic = get_cached_traffic(subscription_id)
|
|
if not traffic:
|
|
traffic = get_traffic_stats(subscription_id)
|
|
set_cached_traffic(subscription_id, traffic)
|
|
upload = traffic.get("total_up", 0)
|
|
download = traffic.get("total_down", 0)
|
|
|
|
lines.append(f"#subscription-userinfo: upload={upload}; download={download}; total={traffic_limit_bytes}; expire={expire_ts}")
|
|
|
|
if announce_header:
|
|
lines.append(f"#announce: {announce_header}")
|
|
|
|
lines.extend(all_links)
|
|
content = "\n".join(lines)
|
|
|
|
headers = {
|
|
"Profile-Title": title,
|
|
"Profile-Update-Interval": str(update_interval),
|
|
"Profile-Web-Page-Url": web_url,
|
|
"Subscription-Userinfo": f"upload={upload}; download={download}; total={traffic_limit_bytes}; expire={expire_ts}",
|
|
"Content-Disposition": f"attachment; filename={title}_{user['username']}",
|
|
}
|
|
if announce_header:
|
|
headers["Announce"] = announce_header
|
|
if support_url:
|
|
headers["Support-Url"] = support_url
|
|
headers["Hide-Settings"] = "1"
|
|
if expire_ts:
|
|
headers["Sub-Expire"] = "1"
|
|
if support_url:
|
|
headers["Sub-Expire-Button-Link"] = support_url
|
|
|
|
if format == "base64":
|
|
return Response(content=base64.b64encode(content.encode()).decode(), media_type="text/plain; charset=utf-8", headers=headers)
|
|
return Response(content=content, media_type="text/plain; charset=utf-8", headers=headers)
|
|
|
|
async def get_web_page(subscription_id: str):
|
|
user, tier = get_user_tier(subscription_id)
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
|
|
tier_config = settings.get("tiers", {}).get(tier, {})
|
|
servers_for_tier = get_servers_for_tier(tier)
|
|
inbounds = deduplicate_inbounds(servers_for_tier, tier)
|
|
|
|
host = settings.get("general", {}).get("host", "conn.zernmc.ru")
|
|
title = settings.get("general", {}).get("title", "ZernProxy")
|
|
announcement = get_motd() or settings.get("announcement", "")
|
|
da_config = settings.get("payments", {}).get("donationalerts", {})
|
|
|
|
sub_url = f"https://{host}/sub/{subscription_id}"
|
|
qr_base64 = generate_qr_base64(sub_url, size=300)
|
|
logo_base64 = get_logo_base64()
|
|
|
|
logo_html = f'<img src="data:image/png;base64,{logo_base64}" alt="Logo" class="logo-img">' if logo_base64 else '<div class="logo-emoji">⚡</div>'
|
|
announcement_html = f'<div class="announcement">{announcement}</div>' if announcement else ''
|
|
|
|
tier_color = "#4CAF50" if tier == "paid" else "#757575"
|
|
tier_name = tier_config.get("name", "Free")
|
|
tier_badge = f'<span class="tier-badge" style="background: {tier_color}">{tier_name}</span>'
|
|
|
|
days_remaining = get_remaining_days(user)
|
|
days_info = f"<p>⏳ Осталось дней: {days_remaining}</p>" if tier == "paid" and days_remaining > 0 else ""
|
|
|
|
traffic = get_cached_traffic(subscription_id)
|
|
if not traffic:
|
|
traffic = get_traffic_stats(subscription_id)
|
|
set_cached_traffic(subscription_id, traffic)
|
|
traffic_limit_val = user.get('traffic_limit_gb') or tier_config.get('traffic_limit_gb', 0)
|
|
traffic_limit_str = "∞" if traffic_limit_val == 0 else f"{traffic_limit_val} GB"
|
|
traffic_info = f"<p>📊 Лимит: {traffic_limit_str}</p><p>⬆️ {format_bytes(traffic['total_up'])} | ⬇️ {format_bytes(traffic['total_down'])}</p>"
|
|
|
|
traffic_details = ""
|
|
if traffic.get("by_server"):
|
|
for srv_name, data in traffic["by_server"].items():
|
|
if data["up"] or data["down"]:
|
|
traffic_details += f'<div class="traffic-server"><span class="server-name">{srv_name}</span>: <span class="traffic-values">⬆ {format_bytes(data["up"])} ⬇ {format_bytes(data["down"])}</span></div>'
|
|
|
|
info_html = f'<div class="info-block">{days_info}{traffic_info}{traffic_details}</div>' if days_info or traffic_info or traffic_details else ''
|
|
|
|
servers_html = "".join(f'<span class="srv-tag">{get_flag_emoji(srv.get("country", ""))} {srv["name"].upper()}</span>' for srv in servers_for_tier)
|
|
|
|
support_btn = ""
|
|
if da_config.get("enabled"):
|
|
da_url = da_config.get("url", "#")
|
|
support_btn = f'''
|
|
<a href="{da_url}" class="btn btn-support" target="_blank">
|
|
Поддержать проект
|
|
</a>
|
|
'''
|
|
|
|
html = render_template("sub.html",
|
|
title=title, logo_html=logo_html, tier_badge=tier_badge,
|
|
announcement_html=announcement_html, info_html=info_html,
|
|
servers_html=servers_html, qr_base64=qr_base64,
|
|
sub_url=sub_url, support_btn=support_btn)
|
|
return HTMLResponse(content=html)
|
|
|
|
@app.post("/payment/webhook/donationalerts")
|
|
async def webhook_donationalerts(request: Request):
|
|
da_config = settings.get("payments", {}).get("donationalerts", {})
|
|
webhook_secret = da_config.get("webhook_secret", "")
|
|
|
|
if webhook_secret:
|
|
provided = request.headers.get("X-Webhook-Secret", "")
|
|
if provided != webhook_secret:
|
|
return JSONResponse({"error": "Invalid secret"}, status_code=403)
|
|
|
|
try:
|
|
data = await request.json()
|
|
except:
|
|
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
|
|
|
|
amount = data.get("amount", 0)
|
|
username = data.get("username", "")
|
|
message = data.get("message", "")
|
|
donation_id = data.get("id", 0)
|
|
|
|
logger.info(f"DA webhook: id={donation_id} amount={amount} username={username}")
|
|
|
|
user = None
|
|
message_parts = message.split() if message else []
|
|
|
|
for part in message_parts:
|
|
conn = get_db()
|
|
try:
|
|
if part.isdigit():
|
|
user = conn.execute("SELECT * FROM users WHERE id = ?", (int(part),)).fetchone()
|
|
if not user:
|
|
user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (part,)).fetchone()
|
|
if user:
|
|
break
|
|
finally:
|
|
conn.close()
|
|
|
|
if not user and username:
|
|
conn = get_db()
|
|
try:
|
|
user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (username,)).fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
if not user:
|
|
return JSONResponse({"status": "ignored", "reason": "user_not_found"})
|
|
|
|
tiers_config = settings.get("tiers", {})
|
|
tier = None
|
|
days = 0
|
|
|
|
for tier_name, tier_data in tiers_config.items():
|
|
prices = tier_data.get("prices", {})
|
|
tier_days = tier_data.get("days", {})
|
|
for price_key, price_val in prices.items():
|
|
if amount == price_val:
|
|
tier = tier_name
|
|
days = tier_days.get(price_key, 30)
|
|
break
|
|
if tier:
|
|
break
|
|
|
|
if not tier:
|
|
return JSONResponse({"status": "ignored", "reason": "amount_not_recognized"})
|
|
|
|
conn = get_db()
|
|
try:
|
|
now_ts = int(time.time())
|
|
conn.execute("""
|
|
UPDATE users SET
|
|
tier = ?,
|
|
tariff_days_bought = tariff_days_bought + ?,
|
|
tariff_end_at = MAX(COALESCE(tariff_end_at, 0), ?) + ? * 86400,
|
|
total_paid_rubles = total_paid_rubles + ?
|
|
WHERE id = ?
|
|
""", (tier, days, now_ts, days, amount, user["id"]))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
logger.info(f"VPN payment: {username} paid {amount} RUB, +{days} days")
|
|
return JSONResponse({"status": "ok", "user": user["username"], "days": days})
|
|
|
|
@app.get("/admin/users")
|
|
async def admin_users(request: Request):
|
|
if not check_admin(request):
|
|
return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg=""))
|
|
conn = get_db()
|
|
try:
|
|
users = conn.execute("SELECT * FROM users ORDER BY created_at DESC").fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
da_config = settings.get("payments", {}).get("donationalerts", {})
|
|
|
|
title = settings.get("general", {}).get("title", "ZernProxy")
|
|
tier_options = "".join(
|
|
f'<option value="{k}">{v.get("name", k.capitalize())}</option>'
|
|
for k, v in settings.get("tiers", {}).items())
|
|
users_rows = ""
|
|
for u in users:
|
|
traffic = f'{u["traffic_limit_gb"]} GB' if u['traffic_limit_gb'] > 0 else '∞'
|
|
active = '✓' if u['is_active'] else '✗'
|
|
tier_display = u['tier'].upper()
|
|
rem_days = get_remaining_days(u)
|
|
users_rows += f'''<tr>
|
|
<td>{u['id']}</td>
|
|
<td style="font-weight:600;color:var(--text)">{u['username']}</td>
|
|
<td><code>{u['subscription_id']}</code></td>
|
|
<td><span class="badge badge-{u['tier']}">{tier_display}</span></td>
|
|
<td>{rem_days}</td>
|
|
<td>{u['total_paid_rubles']}₽</td>
|
|
<td>{traffic}</td>
|
|
<td>{active}</td>
|
|
<td>{u['created_at'][:10]}</td>
|
|
<td>
|
|
<button class="btn-sm" onclick="editUser({u['id']}, '{u['tier']}', {rem_days}, {u['traffic_limit_gb']}, {1 if u['is_active'] else 0})">✍️</button>
|
|
<button class="btn-sm danger" onclick="deleteUser({u['id']}, '{u['username']}')">🗑️</button>
|
|
</td>
|
|
</tr>'''
|
|
html = render_template("admin_users.html",
|
|
title=title, user_count=len(users), users_rows=users_rows,
|
|
tier_options=tier_options)
|
|
return HTMLResponse(content=html)
|
|
|
|
@app.get("/admin/dashboard")
|
|
async def admin_dashboard(request: Request):
|
|
if not check_admin(request):
|
|
return HTMLResponse(content=render_template("admin_login.html", title="Вход", error_msg=""))
|
|
|
|
conn = get_db()
|
|
try:
|
|
total_users = conn.execute("SELECT COUNT(*) as c FROM users").fetchone()["c"]
|
|
free_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='free'").fetchone()["c"]
|
|
paid_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='paid'").fetchone()["c"]
|
|
test_users = conn.execute("SELECT COUNT(*) as c FROM users WHERE tier='test'").fetchone()["c"]
|
|
total_revenue = conn.execute("SELECT SUM(total_paid_rubles) as s FROM users").fetchone()["s"] or 0
|
|
total_donations = conn.execute("SELECT SUM(total_paid_rubles) as s FROM users WHERE total_paid_rubles > 0").fetchone()["s"] or 0
|
|
finally:
|
|
conn.close()
|
|
|
|
online_count = 0
|
|
for srv in servers:
|
|
for inbound in srv.get("inbounds", []):
|
|
try:
|
|
api = Api(host=inbound["api_host"], username=inbound["api_user"], password=inbound["api_pass"], use_tls_verify=False)
|
|
api.login()
|
|
online = api.client.online()
|
|
online_count += len(online)
|
|
except:
|
|
pass
|
|
|
|
title = settings.get("general", {}).get("title", "ZernProxy")
|
|
servers_rows = "".join(
|
|
f'<div class="srv-row"><span class="srv-name">{get_flag_emoji(s.get("country",""))} {s["name"].upper()}</span><span class="srv-ibs">{len(s.get("inbounds",[]))} inbound</span><button class="btn-sm" onclick="syncServer(\'{s["name"]}\')">🔄</button></div>'
|
|
for s in servers)
|
|
html = render_template("admin_dashboard.html",
|
|
title=title, total_users=total_users, free_users=free_users,
|
|
paid_users=paid_users, test_users=test_users,
|
|
online_count=online_count, total_revenue=total_revenue,
|
|
server_count=len(servers),
|
|
inbound_count=sum(len(s.get("inbounds",[])) for s in servers),
|
|
servers_rows=servers_rows)
|
|
return HTMLResponse(content=html)
|
|
|
|
@app.get("/")
|
|
async def home_page():
|
|
title = settings.get("general", {}).get("title", "ZernProxy")
|
|
da_url = settings.get("payments", {}).get("donationalerts", {}).get("url", "")
|
|
|
|
statuses = await fetch_servers_status()
|
|
|
|
srv_cards = ""
|
|
for i, s in enumerate(statuses):
|
|
chk = s.get("checks", {})
|
|
cpu = chk.get("CPU", {}).get("value")
|
|
ram = chk.get("RAM", {}).get("value")
|
|
disk = chk.get("Disk /", {}).get("value")
|
|
srv_name = s.get("server_name", s["name"].upper())
|
|
delay = 0.45 + i * 0.15
|
|
online = s.get("online", False)
|
|
|
|
srv_inbounds = []
|
|
for srv_cfg in servers:
|
|
if srv_cfg["name"] == s["name"]:
|
|
srv_inbounds = srv_cfg.get("inbounds", [])
|
|
all_ib = srv_inbounds
|
|
has_free = any(ib.get("is_free", True) for ib in all_ib)
|
|
has_paid = any(not ib.get("is_free", True) for ib in all_ib)
|
|
break
|
|
else:
|
|
has_free = True
|
|
has_paid = True
|
|
|
|
if has_free and has_paid:
|
|
badge = '<span class="tier both">Free+Premium</span>'
|
|
elif has_free:
|
|
badge = '<span class="tier free">Free</span>'
|
|
else:
|
|
badge = '<span class="tier prem">Premium</span>'
|
|
|
|
if online:
|
|
_, hc, hl = calc_health_score(chk)
|
|
s1 = cpu if cpu is not None else 0
|
|
s2 = ram if ram is not None else 0
|
|
s3 = disk if disk is not None else 0
|
|
stats = f'<div class="s-stats">CPU {s1:.0f}% · RAM {s2:.0f}% · DSK {s3:.0f}%</div>'
|
|
else:
|
|
hc = "#475569"
|
|
hl = "Offline"
|
|
stats = '<div class="s-stats off">Offline</div>'
|
|
|
|
inbound_cards = ""
|
|
for ib in srv_inbounds:
|
|
ib_name = ib.get("name", "unknown")
|
|
ib_type = ib.get("network", ib.get("flow", ""))
|
|
inbound_cards += f'<div class="ib-card"><span class="ib-dot"></span><span class="ib-name">{ib_name}</span><span class="ib-type">{ib_type}</span></div>\n'
|
|
|
|
ru_note = ""
|
|
if s.get("country", "").lower() == "ru" and online:
|
|
ru_note = '<div class="ru-note">★ Нет рекламы в YouTube · Адаптирован к будущему ограничению 15 ГБ зарубежного трафика через Proxy</div>'
|
|
|
|
srv_cards += f'''
|
|
<div class="s-card" style="--d: {delay}s; --hl: {hc}">
|
|
<div class="s-head">
|
|
<span class="flag">{get_flag_emoji(s.get("country",""))}</span> {srv_name} {badge}
|
|
<span class="h-dot" style="background:{hc}"></span>
|
|
<span class="h-lbl" style="color:{hc}">{hl}</span>
|
|
</div>
|
|
{stats}
|
|
<div class="ib-list">{inbound_cards}</div>
|
|
{ru_note}
|
|
</div>'''
|
|
|
|
return HTMLResponse(content=render_template("home.html", title=title, da_url=da_url, srv_cards=srv_cards))
|
|
|
|
|
|
async def fetch_servers_status() -> list:
|
|
results = []
|
|
async with httpx.AsyncClient(timeout=8.0, verify=False) as client:
|
|
for srv in servers:
|
|
url = srv.get("status_url", "")
|
|
entry = {"name": srv["name"], "country": srv.get("country",""), "checks": {}, "online": False, "uptime": ""}
|
|
if not url:
|
|
results.append(entry)
|
|
continue
|
|
try:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
entry["online"] = True
|
|
entry["server_name"] = data.get("server_name", "")
|
|
entry["checks"] = data.get("checks", {})
|
|
else:
|
|
entry["checks"]["error"] = {"value": f"HTTP {resp.status_code}"}
|
|
except Exception as e:
|
|
entry["checks"]["error"] = {"value": str(e)[:50]}
|
|
results.append(entry)
|
|
return results
|
|
|
|
@app.get("/webhook/tg")
|
|
async def tg_webhook():
|
|
bot_cfg = settings.get("bot", {})
|
|
if bot_cfg.get("enabled"):
|
|
return JSONResponse({"status": "ok", "bot": True, "message": "Telegram bot is ready"})
|
|
return JSONResponse({"status": "ok", "bot": False, "message": "Telegram bot not configured"})
|
|
|
|
def create_3xui_client(username: str, sub_id: str, inbound: dict, traffic_gb: int = 0) -> dict:
|
|
"""Создаёт клиента на 3x-UI сервере"""
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
return {"success": False, "error": "missing_credentials"}
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
inbound_name = inbound.get('name', 'default')
|
|
email = f"{username}_{inbound_name}@vless.local"
|
|
total_bytes = traffic_gb * 1073741824 if traffic_gb > 0 else 0
|
|
|
|
logger.info(f"Creating client: email={email}, total_bytes={total_bytes}, inbound={inbound_name}")
|
|
|
|
from py3xui.client import Client
|
|
if inbound.get("protocol") == "trojan":
|
|
client = Client(
|
|
id="",
|
|
password=str(uuid.uuid4()),
|
|
email=email,
|
|
enable=True,
|
|
total_gb=total_bytes,
|
|
expiry_time=0,
|
|
limit_ip=0,
|
|
subId=sub_id
|
|
)
|
|
else:
|
|
client = Client(
|
|
id=str(uuid.uuid4()),
|
|
email=email,
|
|
enable=True,
|
|
total_gb=total_bytes,
|
|
expiry_time=0,
|
|
limit_ip=0,
|
|
subId=sub_id
|
|
)
|
|
|
|
try:
|
|
existing = api.client.get_by_email(email)
|
|
if existing:
|
|
logger.info(f"Deleting existing client: {existing.id}")
|
|
api.client.delete(existing.id, inbound_id)
|
|
except:
|
|
pass
|
|
|
|
logger.info(f"About to add client: {client}")
|
|
api.client.add(inbound_id=inbound_id, clients=[client])
|
|
logger.info(f"Client created successfully on {inbound_name}")
|
|
return {"success": True, "email": email}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating client: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return {"success": False, "error": str(e)[:150]}
|
|
|
|
def delete_3xui_client(username: str, sub_id: str, inbound: dict) -> dict:
|
|
"""Удаляет клиента с 3x-UI сервера"""
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
return {"success": False, "error": "missing_credentials"}
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
inbounds = api.inbound.get_list()
|
|
for ib in inbounds:
|
|
if ib.id == inbound_id and ib.client_stats:
|
|
for client in ib.client_stats:
|
|
if getattr(client, 'sub_id', '') == sub_id:
|
|
api.client.delete(inbound_id, client.uuid)
|
|
logger.info(f"Deleted client from {inbound.get('name')}")
|
|
return {"success": True}
|
|
|
|
return {"success": True, "error": "not_found"}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)[:100]}
|
|
|
|
def fetch_server_sub_ids(srv: dict) -> dict:
|
|
result = {}
|
|
srv_name = srv.get("name", "?")
|
|
for inbound in srv.get("inbounds", []):
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
ib_name = inbound.get("name", "?")
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
continue
|
|
try:
|
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout
|
|
def _fetch():
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
ids = set()
|
|
for ib in api.inbound.get_list():
|
|
if ib.id == inbound_id and ib.client_stats:
|
|
for client in ib.client_stats:
|
|
sid = getattr(client, 'sub_id', '') or ''
|
|
if sid:
|
|
ids.add(sid)
|
|
return ids
|
|
with ThreadPoolExecutor(max_workers=1) as ex:
|
|
future = ex.submit(_fetch)
|
|
try:
|
|
fetched = future.result(timeout=20)
|
|
result[inbound_id] = fetched
|
|
logger.info(f"fetch_server_sub_ids {srv_name}/{ib_name}: got {len(fetched)} sub_ids")
|
|
except FutureTimeout:
|
|
logger.warning(f"Timeout fetching clients from {srv_name}/{ib_name}")
|
|
except Exception as e:
|
|
logger.warning(f"fetch_server_sub_ids {srv_name}/{ib_name} error: {e}")
|
|
logger.info(f"fetch_server_sub_ids {srv_name}: inbound count {len(result)}")
|
|
return result
|
|
|
|
def propagate_server_sync(server_name: str) -> dict:
|
|
target_srv = next((s for s in servers if s["name"] == server_name), None)
|
|
if not target_srv:
|
|
return {"error": "Server not found"}
|
|
conn = get_db()
|
|
try:
|
|
users = conn.execute("SELECT username, subscription_id, traffic_limit_gb, is_active FROM users WHERE is_active = 1").fetchall()
|
|
finally:
|
|
conn.close()
|
|
other_servers = [s for s in servers if s.get("is_active", True) and s["name"] != server_name]
|
|
threshold = max(1, len(other_servers) // 2 + 1)
|
|
|
|
target_ib_ids = fetch_server_sub_ids(target_srv)
|
|
other_ids = {}
|
|
for s in other_servers:
|
|
ib_dict = fetch_server_sub_ids(s)
|
|
flat = set()
|
|
for ids in ib_dict.values():
|
|
flat.update(ids)
|
|
other_ids[s["name"]] = flat
|
|
|
|
total = len(users)
|
|
added = skipped = failed = 0
|
|
inbound_stats = {}
|
|
for ib in target_srv.get("inbounds", []):
|
|
inbound_stats[ib.get("name", f"id_{ib.get('id','?')}")] = 0
|
|
|
|
results = []
|
|
for u in users:
|
|
sub_id = u["subscription_id"]
|
|
username = u["username"]
|
|
|
|
count = sum(1 for s in other_servers if sub_id in other_ids.get(s["name"], set()))
|
|
if count < threshold:
|
|
skipped += 1
|
|
results.append({"username": username, "skipped": True, "servers": count, "total": len(other_servers)})
|
|
continue
|
|
|
|
user_added = False
|
|
for inbound in target_srv.get("inbounds", []):
|
|
ib_id = inbound.get("id")
|
|
ib_name = inbound.get("name", "?")
|
|
existing = target_ib_ids.get(ib_id, set())
|
|
if sub_id in existing:
|
|
continue
|
|
r = create_3xui_client(username, sub_id, inbound, u["traffic_limit_gb"] or 0)
|
|
if r.get("success"):
|
|
user_added = True
|
|
inbound_stats[ib_name] = inbound_stats.get(ib_name, 0) + 1
|
|
clear_cache(sub_id)
|
|
else:
|
|
failed += 1
|
|
results.append({"username": username, "inbound": ib_name, "error": r.get("error", "")})
|
|
|
|
if user_added:
|
|
added += 1
|
|
results.append({"username": username, "added": True})
|
|
|
|
already = sum(1 for u in users if u["subscription_id"] in {sid for ids in target_ib_ids.values() for sid in ids})
|
|
return {"server": server_name, "total": total, "already_on_server": already,
|
|
"added": added, "skipped": skipped, "failed": failed,
|
|
"threshold": f"{threshold}/{len(other_servers)}",
|
|
"per_inbound_added": inbound_stats, "results": results}
|
|
|
|
@app.post("/admin/api/propagate/{server_name}")
|
|
async def propagate_server(request: Request, server_name: str):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
result = propagate_server_sync(server_name)
|
|
if "error" in result:
|
|
return JSONResponse(result, status_code=404)
|
|
return JSONResponse(result)
|
|
|
|
@app.post("/admin/api/users")
|
|
async def create_user(request: Request, data: dict):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
username = data.get("username", "").strip()
|
|
if not username:
|
|
return JSONResponse({"error": "username required"}, status_code=400)
|
|
|
|
sub_id = generate_sub_id()
|
|
conn = get_db()
|
|
|
|
traffic_gb = int(data.get("traffic_limit_gb", 0) or 0)
|
|
logger.info(f"Creating user: username={username}, traffic_gb={traffic_gb}")
|
|
|
|
results = []
|
|
for srv in servers:
|
|
srv_result = {"server": srv["name"], "inbounds": []}
|
|
for inbound in srv.get("inbounds", []):
|
|
result = create_3xui_client(username, sub_id, inbound, traffic_gb)
|
|
srv_result["inbounds"].append({
|
|
"name": inbound.get("name"),
|
|
"success": result["success"],
|
|
"error": result.get("error", "")
|
|
})
|
|
results.append(srv_result)
|
|
|
|
try:
|
|
conn.execute("""
|
|
INSERT INTO users (username, subscription_id, tier, tariff_days_bought, tariff_days_remaining, total_paid_rubles, traffic_limit_gb, is_active)
|
|
VALUES (?, ?, 'free', 0, 0, 0, 0, 1)
|
|
""", (username, sub_id))
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return JSONResponse({"error": "username exists"}, status_code=400)
|
|
finally:
|
|
conn.close()
|
|
|
|
success_count = sum(1 for r in results for ib in r["inbounds"] if ib["success"])
|
|
total_count = sum(len(r["inbounds"]) for r in results)
|
|
|
|
return JSONResponse({
|
|
"status": "ok",
|
|
"username": username,
|
|
"subscription_id": sub_id,
|
|
"subscription_url": f"https://{settings.get('general', {}).get('host', 'conn.zernmc.ru')}/sub/{sub_id}",
|
|
"results": results,
|
|
"summary": f"{success_count}/{total_count} инбаундов создано"
|
|
})
|
|
|
|
@app.post("/admin/api/users/update")
|
|
async def update_user(request: Request, data: dict):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
logger.info(f"RAW data received: {data}")
|
|
|
|
user_id = int(data.get("id", 0))
|
|
tier = str(data.get("tier", "free"))
|
|
tariff_days_remaining = int(data.get("tariff_days_remaining", 0) or 0)
|
|
tariff_end_at = int(time.time()) + tariff_days_remaining * 86400 if tariff_days_remaining > 0 else 0
|
|
traffic_limit_gb = int(data.get("traffic_limit_gb", 0) or 0)
|
|
is_active = 1 if data.get("is_active") in [True, "true", "on", "1", 1] else 0
|
|
|
|
logger.info(f"Update user: id={user_id}, tier='{tier}', days={tariff_days_remaining}, traffic={traffic_limit_gb}, active={is_active}")
|
|
|
|
conn = get_db()
|
|
try:
|
|
conn.execute("""
|
|
UPDATE users SET tier = ?, tariff_days_remaining = ?, tariff_end_at = ?, traffic_limit_gb = ?, is_active = ?
|
|
WHERE id = ?
|
|
""", (tier, tariff_days_remaining, tariff_end_at, traffic_limit_gb, is_active, user_id))
|
|
conn.commit()
|
|
|
|
updated = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
logger.info(f"After update DB: {dict(updated)}")
|
|
finally:
|
|
conn.close()
|
|
|
|
return JSONResponse({"status": "ok"})
|
|
|
|
@app.post("/admin/api/users/delete")
|
|
async def delete_user(request: Request, data: dict):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
user_id = data.get("id")
|
|
username = data.get("username", "")
|
|
|
|
conn = get_db()
|
|
try:
|
|
user = conn.execute("SELECT subscription_id FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
if not user:
|
|
return JSONResponse({"error": "user not found"}, status_code=404)
|
|
|
|
sub_id = user["subscription_id"]
|
|
|
|
for srv in servers:
|
|
for inbound in srv.get("inbounds", []):
|
|
result = delete_3xui_client(username, sub_id, inbound)
|
|
|
|
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
return JSONResponse({"status": "ok", "message": "User deleted from all servers"})
|
|
|
|
async def rotate_shortids():
|
|
while True:
|
|
try:
|
|
rotation_hours = settings.get("shortid_rotation_hours", 11)
|
|
await asyncio.sleep(rotation_hours * 3600)
|
|
|
|
for srv in servers:
|
|
for inbound in srv.get("inbounds", []):
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
|
|
if not all([api_host, api_user, api_pass]):
|
|
continue
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
new_shortid = secrets.token_hex(4)
|
|
|
|
inbounds = api.inbound.get_inbounds()
|
|
for ib in inbounds:
|
|
if ib.id == inbound.get("id"):
|
|
ib.shortid = new_shortid
|
|
api.inbound.update_inbound(ib)
|
|
logger.info(f"ShortID rotated for {srv['name']}/{inbound['name']}: {new_shortid}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"ShortID rotation error {srv['name']}/{inbound['name']}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"ShortID rotation error: {e}")
|
|
await asyncio.sleep(3600)
|
|
|
|
def update_3xui_expiry(sub_id: str, days: int):
|
|
"""Обновляет expiry_time на всех 3x-UI серверах"""
|
|
import time
|
|
expiry_timestamp = int(time.time() + (days * 86400))
|
|
|
|
for srv in servers:
|
|
for inbound in srv.get("inbounds", []):
|
|
api_host = inbound.get("api_host")
|
|
api_user = inbound.get("api_user")
|
|
api_pass = inbound.get("api_pass")
|
|
inbound_id = inbound.get("id")
|
|
|
|
if not all([api_host, api_user, api_pass, inbound_id]):
|
|
continue
|
|
|
|
try:
|
|
api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False)
|
|
api.login()
|
|
|
|
inbounds = api.inbound.get_list()
|
|
for ib in inbounds:
|
|
if ib.id == inbound_id and ib.client_stats:
|
|
for client in ib.client_stats:
|
|
if getattr(client, 'sub_id', '') == sub_id:
|
|
client.expiry_time = expiry_timestamp
|
|
api.client.update(client.uuid, client)
|
|
logger.info(f"Updated expiry for {srv['name']}/{inbound['name']}: {days} days")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error updating expiry {srv['name']}/{inbound['name']}: {e}")
|
|
|
|
async def poll_donationalerts():
|
|
global last_donation_id
|
|
while True:
|
|
try:
|
|
da_config = settings.get("payments", {}).get("donationalerts", {})
|
|
if not da_config.get("enabled"):
|
|
await asyncio.sleep(300)
|
|
continue
|
|
|
|
api_token = da_config.get("api_token", "")
|
|
if not api_token:
|
|
logger.warning("DonationAlerts API token not configured")
|
|
await asyncio.sleep(300)
|
|
continue
|
|
|
|
interval = da_config.get("check_interval_minutes", 5)
|
|
await asyncio.sleep(interval * 60)
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.get(
|
|
"https://www.donationalerts.com/api/v1/alerts/donations",
|
|
headers={"Authorization": f"Bearer {api_token}"}
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
logger.error(f"DA API error: {resp.status_code}")
|
|
continue
|
|
|
|
data = resp.json()
|
|
donations = data.get("data", [])
|
|
|
|
for donation in reversed(donations):
|
|
donation_id = donation.get("id")
|
|
if donation_id <= last_donation_id:
|
|
continue
|
|
|
|
amount = donation.get("amount", 0)
|
|
username = donation.get("username", "")
|
|
message = donation.get("message", "")
|
|
|
|
tiers_config = settings.get("tiers", {})
|
|
|
|
tier = None
|
|
days = 0
|
|
|
|
for tier_name, tier_data in tiers_config.items():
|
|
prices = tier_data.get("prices", {})
|
|
tier_days = tier_data.get("days", {})
|
|
for price_key, price_val in prices.items():
|
|
if amount == price_val:
|
|
tier = tier_name
|
|
days = tier_days.get(price_key, 30)
|
|
break
|
|
if tier:
|
|
break
|
|
|
|
if not tier:
|
|
logger.info(f"DA: ignoring amount {amount} RUB (not in config)")
|
|
last_donation_id = donation_id
|
|
continue
|
|
|
|
user = None
|
|
message_parts = message.split() if message else []
|
|
|
|
conn = get_db()
|
|
try:
|
|
for part in message_parts:
|
|
if part.isdigit():
|
|
user = conn.execute("SELECT * FROM users WHERE id = ?", (int(part),)).fetchone()
|
|
if not user:
|
|
user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (part,)).fetchone()
|
|
if user:
|
|
break
|
|
|
|
if not user and username:
|
|
user = conn.execute("SELECT * FROM users WHERE username = ? COLLATE NOCASE", (username,)).fetchone()
|
|
|
|
if user:
|
|
now_ts = int(time.time())
|
|
current_end = user.get("tariff_end_at", 0)
|
|
if current_end > now_ts:
|
|
new_end = current_end + days * 86400
|
|
else:
|
|
new_end = now_ts + days * 86400
|
|
|
|
conn.execute("""
|
|
UPDATE users SET
|
|
tier = ?,
|
|
tariff_days_bought = tariff_days_bought + ?,
|
|
tariff_end_at = ?,
|
|
total_paid_rubles = total_paid_rubles + ?
|
|
WHERE id = ?
|
|
""", (tier, days, new_end, amount, user["id"]))
|
|
conn.commit()
|
|
|
|
sub_id = user["subscription_id"]
|
|
update_3xui_expiry(sub_id, days)
|
|
|
|
logger.info(f"VPN payment: {username} paid {amount} RUB, tier={tier}, +{days} days")
|
|
finally:
|
|
conn.close()
|
|
|
|
last_donation_id = donation_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"DonationAlerts polling error: {e}")
|
|
await asyncio.sleep(300)
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "servers": len(servers), "settings_loaded": bool(settings)}
|
|
|
|
@app.post("/admin/api/reload")
|
|
async def reload_configs(request: Request):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
load_configs()
|
|
clear_cache()
|
|
return {"status": "ok"}
|
|
|
|
@app.get("/admin/api/rotate-shortids")
|
|
async def manual_rotate(request: Request):
|
|
if not check_admin(request):
|
|
return JSONResponse({"error": "Unauthorized"}, status_code=401)
|
|
await rotate_shortids()
|
|
return {"status": "ok"}
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.create_task(rotate_shortids())
|
|
loop.create_task(poll_donationalerts())
|
|
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
|