diff --git a/aggregator.py b/aggregator.py index 2888553..ca51314 100644 --- a/aggregator.py +++ b/aggregator.py @@ -1004,28 +1004,8 @@ def delete_3xui_client(username: str, sub_id: str, inbound: dict) -> dict: except Exception as e: return {"success": False, "error": str(e)[:100]} -def user_exists_on_server(sub_id: str, srv: dict) -> bool: - for inbound in srv.get("inbounds", []): - api_host = inbound.get("api_host") - api_user = inbound.get("api_user") - api_pass = inbound.get("api_pass") - inbound_id = inbound.get("id") - if not all([api_host, api_user, api_pass, inbound_id]): - continue - try: - api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) - api.login() - for ib in api.inbound.get_list(): - if ib.id == inbound_id and ib.client_stats: - for client in ib.client_stats: - if getattr(client, 'sub_id', '') == sub_id: - return True - except: - pass - return False - -def fetch_server_sub_ids(srv: dict) -> set: - ids = set() +def fetch_server_sub_ids(srv: dict) -> dict: + result = {} srv_name = srv.get("name", "?") for inbound in srv.get("inbounds", []): api_host = inbound.get("api_host") @@ -1040,26 +1020,26 @@ def fetch_server_sub_ids(srv: dict) -> set: def _fetch(): api = Api(host=api_host, username=api_user, password=api_pass, use_tls_verify=False) api.login() - result = set() + ids = set() for ib in api.inbound.get_list(): if ib.id == inbound_id and ib.client_stats: for client in ib.client_stats: sid = getattr(client, 'sub_id', '') or '' if sid: - result.add(sid) - return result + ids.add(sid) + return ids with ThreadPoolExecutor(max_workers=1) as ex: future = ex.submit(_fetch) try: fetched = future.result(timeout=20) - ids.update(fetched) + result[inbound_id] = fetched logger.info(f"fetch_server_sub_ids {srv_name}/{ib_name}: got {len(fetched)} sub_ids") except FutureTimeout: logger.warning(f"Timeout fetching clients from {srv_name}/{ib_name}") except Exception as e: logger.warning(f"fetch_server_sub_ids {srv_name}/{ib_name} error: {e}") - logger.info(f"fetch_server_sub_ids {srv_name}: total {len(ids)} sub_ids") - return ids + logger.info(f"fetch_server_sub_ids {srv_name}: inbound count {len(result)}") + return result def propagate_server_sync(server_name: str) -> dict: target_srv = next((s for s in servers if s["name"] == server_name), None) @@ -1072,38 +1052,58 @@ def propagate_server_sync(server_name: str) -> dict: conn.close() other_servers = [s for s in servers if s.get("is_active", True) and s["name"] != server_name] threshold = max(1, len(other_servers) // 2 + 1) - target_ids = fetch_server_sub_ids(target_srv) + + target_ib_ids = fetch_server_sub_ids(target_srv) other_ids = {} for s in other_servers: - other_ids[s["name"]] = fetch_server_sub_ids(s) + ib_dict = fetch_server_sub_ids(s) + flat = set() + for ids in ib_dict.values(): + flat.update(ids) + other_ids[s["name"]] = flat + total = len(users) - already = added = skipped = failed = 0 + added = skipped = failed = 0 + inbound_stats = {} + for ib in target_srv.get("inbounds", []): + inbound_stats[ib.get("name", f"id_{ib.get('id','?')}")] = 0 + results = [] for u in users: sub_id = u["subscription_id"] username = u["username"] - if sub_id in target_ids: - already += 1 - continue + count = sum(1 for s in other_servers if sub_id in other_ids.get(s["name"], set())) - if count >= threshold: - ok = True - for inbound in target_srv.get("inbounds", []): - r = create_3xui_client(username, sub_id, inbound, u["traffic_limit_gb"] or 0) - if not r.get("success"): - ok = False - if ok: - added += 1 + if count < threshold: + skipped += 1 + results.append({"username": username, "skipped": True, "servers": count, "total": len(other_servers)}) + continue + + user_added = False + for inbound in target_srv.get("inbounds", []): + ib_id = inbound.get("id") + ib_name = inbound.get("name", "?") + existing = target_ib_ids.get(ib_id, set()) + if sub_id in existing: + continue + r = create_3xui_client(username, sub_id, inbound, u["traffic_limit_gb"] or 0) + if r.get("success"): + user_added = True + inbound_stats[ib_name] = inbound_stats.get(ib_name, 0) + 1 clear_cache(sub_id) else: failed += 1 - results.append({"username": username, "added": ok}) - else: - skipped += 1 - results.append({"username": username, "skipped": True, "servers": count, "total": len(other_servers)}) + results.append({"username": username, "inbound": ib_name, "error": r.get("error", "")}) + + if user_added: + added += 1 + results.append({"username": username, "added": True}) + + already = sum(1 for u in users if u["subscription_id"] in {sid for ids in target_ib_ids.values() for sid in ids}) return {"server": server_name, "total": total, "already_on_server": already, "added": added, "skipped": skipped, "failed": failed, - "threshold": f"{threshold}/{len(other_servers)}", "results": results} + "threshold": f"{threshold}/{len(other_servers)}", + "per_inbound_added": inbound_stats, "results": results} @app.post("/admin/api/propagate/{server_name}") async def propagate_server(request: Request, server_name: str):