diff --git a/aggregator.py b/aggregator.py index a92a160..52d8fa0 100644 --- a/aggregator.py +++ b/aggregator.py @@ -41,6 +41,7 @@ USERS_DB = os.path.join(BASE_DIR, "users.db") LOGO_PATH = os.path.join(BASE_DIR, "logo.png") MOTD_PATH = os.path.join(BASE_DIR, "motd.txt") WEB_DIR = os.path.join(BASE_DIR, "web") +HASHES_FILE = os.path.join(BASE_DIR, "hashes.txt") servers = [] settings = {} @@ -81,6 +82,128 @@ def clear_cache(sub_id: str = None): _links_cache.clear() _traffic_cache.clear() +def get_wdtt_servers() -> List[dict]: + """Возвращает серверы с is_wdtt: true""" + return [s for s in servers if s.get("is_wdtt") and s.get("is_active")] + +async def _wdtt_api_call(server: dict, method: str, path: str, body: dict = None) -> dict: + """Вызов REST API wdtt-server""" + api_key = server.get("wdtt_api_key", "") + host = server.get("wdtt_host", "") + if not api_key or not host: + return {"error": "wdtt server not configured"} + + port = server.get("wdtt_api_port", "8080") + url = f"http://{host}:{port}{path}" + headers = {"X-API-Key": api_key, "Content-Type": "application/json"} + + try: + async with httpx.AsyncClient(timeout=10.0, verify=False) as client: + if method == "POST": + resp = await client.post(url, json=body, headers=headers) + elif method == "DELETE": + resp = await client.request("DELETE", url, json=body, headers=headers) + else: + resp = await client.get(url, headers=headers) + + if resp.status_code in (200, 201): + return resp.json() + else: + try: + err = resp.json() + return {"error": err.get("error", f"HTTP {resp.status_code}")} + except: + return {"error": f"HTTP {resp.status_code}"} + except Exception as e: + logger.error(f"wdtt API call error: {e}") + return {"error": str(e)[:100]} + +async def sync_wdtt_password(user: dict): + """Создаёт или обновляет wdtt-пароль для премиум пользователя""" + wdtt_servers = get_wdtt_servers() + if not wdtt_servers: + return + + tier = user.get("tier", "free") + remaining_days = get_remaining_days(user) + + conn = get_db() + try: + db_user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() + if not db_user: + return + db_user = dict(db_user) + + expires_at = db_user.get("tariff_end_at", 0) + current_password = db_user.get("wdtt_password") + + # Если не премиум — удаляем пароль + if tier != "paid" or remaining_days <= 0: + if current_password: + for srv in wdtt_servers: + await _wdtt_api_call(srv, "DELETE", "/api/passwords/remove", + {"password": current_password}) + conn.execute("UPDATE users SET wdtt_password = NULL, wdtt_expires_at = 0, wdtt_server_id = NULL WHERE id = ?", + (user["id"],)) + conn.commit() + logger.info(f"WDTT password removed for user {user['username']}") + return + + # Берём первый wdtt-сервер + server = wdtt_servers[0] + server_id = servers.index(server) + + if current_password: + # Обновляем expiry + result = await _wdtt_api_call(server, "POST", "/api/passwords/update", + {"password": current_password, "expires_at": expires_at}) + if result.get("status") == "ok": + conn.execute("UPDATE users SET wdtt_expires_at = ?, wdtt_server_id = ? WHERE id = ?", + (expires_at, server_id, user["id"])) + conn.commit() + logger.info(f"WDTT password updated for {user['username']}, expires: {expires_at}") + else: + logger.warning(f"Failed to update WDTT password for {user['username']}: {result.get('error')}") + else: + # Генерируем новый пароль + new_password = secrets.token_urlsafe(24) + result = await _wdtt_api_call(server, "POST", "/api/passwords/add", + {"password": new_password, "expires_at": expires_at}) + if result.get("status") == "ok": + conn.execute(""" + UPDATE users SET wdtt_password = ?, wdtt_expires_at = ?, wdtt_server_id = ? + WHERE id = ? + """, (new_password, expires_at, server_id, user["id"])) + conn.commit() + logger.info(f"WDTT password created for {user['username']}") + else: + logger.warning(f"Failed to create WDTT password for {user['username']}: {result.get('error')}") + finally: + conn.close() + +async def remove_wdtt_password(user: dict): + """Удаляет wdtt-пароль пользователя""" + conn = get_db() + try: + db_user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() + if not db_user: + return + db_user = dict(db_user) + password = db_user.get("wdtt_password") + if not password: + return + + for srv in get_wdtt_servers(): + await _wdtt_api_call(srv, "DELETE", "/api/passwords/remove", + {"password": password}) + + conn.execute("UPDATE users SET wdtt_password = NULL, wdtt_expires_at = 0, wdtt_server_id = NULL WHERE id = ?", + (user["id"],)) + conn.commit() + logger.info(f"WDTT password removed for {user['username']}") + finally: + conn.close() + def get_remaining_days(user) -> int: if not isinstance(user, dict): user = dict(user) @@ -126,6 +249,18 @@ def init_db(): conn.execute("ALTER TABLE users ADD COLUMN tariff_end_at INTEGER DEFAULT 0") except: pass + try: + conn.execute("ALTER TABLE users ADD COLUMN wdtt_password TEXT DEFAULT NULL") + except: + pass + try: + conn.execute("ALTER TABLE users ADD COLUMN wdtt_expires_at INTEGER DEFAULT 0") + except: + pass + try: + conn.execute("ALTER TABLE users ADD COLUMN wdtt_server_id INTEGER DEFAULT NULL") + except: + pass now = int(time.time()) conn.execute("UPDATE users SET tariff_end_at = ? + tariff_days_remaining * 86400 WHERE tariff_end_at = 0 AND tariff_days_remaining > 0", (now,)) conn.commit() @@ -195,13 +330,21 @@ def get_logo_base64() -> Optional[str]: return None def get_motd() -> str: - if os.path.exists(MOTD_PATH): - try: - with open(MOTD_PATH, "r", encoding="utf-8") as f: - return f.read().strip() - except: - pass - return "" + if not os.path.exists(MOTD_PATH): + return "" + with open(MOTD_PATH, encoding="utf-8") as f: + return f.read().strip() + +def get_random_hashes(count: int = 4) -> List[str]: + """Читает до `count` случайных хешей из hashes.txt""" + if not os.path.exists(HASHES_FILE): + return [] + with open(HASHES_FILE, encoding="utf-8") as f: + lines = [line.strip() for line in f if line.strip() and not line.startswith("#")] + if not lines: + return [] + import random + return random.sample(lines, min(count, len(lines))) def format_bytes(bytes_val: int) -> str: if bytes_val == 0: @@ -612,6 +755,35 @@ async def get_subscription(request: Request, subscription_id: str, format: str = if support_url: headers["Sub-Expire-Button-Link"] = support_url + # WDTT-специфичные заголовки для клиента ZernProxy + is_wdtt_client = request.query_params.get("client") == "wdtt" or "WDTT" in request.headers.get("User-Agent", "") + if is_wdtt_client or user.get("wdtt_password"): + wdtt_servers = get_wdtt_servers() + if wdtt_servers: + server_list = [] + for s in wdtt_servers: + server_list.append({ + "name": s.get("name", ""), + "host": s.get("wdtt_host", ""), + "dtls_port": s.get("wdtt_dtls_port", 56000), + "wg_port": s.get("wdtt_wg_port", 56001), + "country": s.get("country", ""), + }) + headers["X-WDTT-Servers"] = json.dumps(server_list) + + if user.get("wdtt_password"): + headers["X-WDTT-Credentials"] = json.dumps({ + "password": user["wdtt_password"], + "expires_at": user.get("wdtt_expires_at", 0), + "server_id": user.get("wdtt_server_id", 0), + }) + + # ZernHash: случайные хеши для клиента + if is_wdtt_client or "ZernHash" in request.headers.get("User-Agent", ""): + hashes = get_random_hashes(4) + if hashes: + headers["X-WDTT-Hashes"] = json.dumps(hashes) + if format == "base64": return Response(content=base64.b64encode(content.encode()).decode(), media_type="text/plain; charset=utf-8", headers=headers) return Response(content=content, media_type="text/plain; charset=utf-8", headers=headers) @@ -755,6 +927,10 @@ async def webhook_donationalerts(request: Request): WHERE id = ? """, (tier, days, now_ts, days, amount, user["id"])) conn.commit() + + updated = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() + if updated: + asyncio.create_task(sync_wdtt_password(dict(updated))) finally: conn.close() @@ -1223,6 +1399,12 @@ async def update_user(request: Request, data: dict): finally: conn.close() + if updated: + updated = dict(updated) + updated["tier"] = tier + clear_cache(updated["subscription_id"]) + asyncio.create_task(sync_wdtt_password(updated)) + return JSONResponse({"status": "ok"}) @app.post("/admin/api/users/delete") @@ -1234,16 +1416,19 @@ async def delete_user(request: Request, data: dict): conn = get_db() try: - user = conn.execute("SELECT subscription_id FROM users WHERE id = ?", (user_id,)).fetchone() + user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if not user: return JSONResponse({"error": "user not found"}, status_code=404) + user = dict(user) sub_id = user["subscription_id"] for srv in servers: for inbound in srv.get("inbounds", []): result = delete_3xui_client(username, sub_id, inbound) + asyncio.create_task(remove_wdtt_password(user)) + conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() finally: @@ -1414,6 +1599,10 @@ async def poll_donationalerts(): sub_id = user["subscription_id"] update_3xui_expiry(sub_id, days) + updated = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone() + if updated: + asyncio.create_task(sync_wdtt_password(dict(updated))) + logger.info(f"VPN payment: {username} paid {amount} RUB, tier={tier}, +{days} days") finally: conn.close()