Files
SashegDev c0310ed573 test(server): add comprehensive test suite (47 tests), fix DB lock and schema bugs
- Add pytest test suite: test_auth.py, test_admin.py, test_pass.py,
  test_proxy.py, test_rate_limit.py, test_client_contract.py
- Fix SQLite 'database is locked' errors: moved log_audit() calls outside
  with get_db() blocks in register, login, logout, refresh, activate_pass
- Enable WAL mode and busy_timeout in get_db() for concurrent access
- Fix /admin/me: removed non-existent 'email' column from query
- Fix /admin/users list: disambiguated activated_at column in JOIN query
- Fix /auth/refresh: now returns refresh_token + expires_in + username/uuid/role
  to match AuthManager.AuthSession expectations; revokes old refresh token
- Fix conftest.py: unique usernames per test to avoid conflicts
- All 47 tests passing
2026-05-04 22:14:06 +00:00

589 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# admin_router.py
from fastapi import APIRouter, HTTPException, Depends, Request, status
from pydantic import BaseModel, Field
from typing import Optional, List
import structlog
import time
import secrets
from datetime import datetime
from auth import get_db, require_role, log_audit, get_current_user
from roles import (
ROLE_PERMISSIONS, UserRole, ROLE_NAMES, has_permission, Permissions,
ROLE_USER, ROLE_PASS_HOLDER, ROLE_MODERATOR, ROLE_ELDER, ROLE_CREATOR
)
logger = structlog.get_logger(__name__)
router = APIRouter(prefix="/admin", tags=["admin"])
# ====================== МОДЕЛИ ======================
class UpdateRoleRequest(BaseModel):
user_id: int
role: int = Field(..., ge=0, le=4)
class PassRequest(BaseModel):
username: str
reason: Optional[str] = None
class PassDecision(BaseModel):
request_id: int
approved: bool
reason: Optional[str] = None
class CreatePassDirectRequest(BaseModel):
username: str
expires_days: Optional[int] = Field(None, ge=1, le=365)
max_uses: int = Field(1, ge=1, le=10)
class BanUserRequest(BaseModel):
user_id: int
days: int = Field(..., ge=1, le=365)
reason: str
# ====================== ЭНДПОИНТЫ ======================
@router.get("/users")
async def list_users(
current_user: dict = Depends(require_role(ROLE_MODERATOR)),
search: Optional[str] = None
):
"""Список пользователей (модераторы видят всех, но без sensitive данных)"""
with get_db() as conn:
query = "SELECT id, username, uuid, role, created_at, last_login, is_active"
params = []
if current_user["role"] < ROLE_ELDER:
# Модераторы не видят забаненных
query += " FROM users WHERE is_active = 1"
else:
query += " FROM users"
if search:
query += " AND (username LIKE ? OR email LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
query += " ORDER BY role DESC, username"
rows = conn.execute(query, params).fetchall()
users = []
for row in rows:
user_data = {
"id": row["id"],
"username": row["username"],
"uuid": row["uuid"],
"role": row["role"],
"role_name": ROLE_NAMES.get(row["role"], "Неизвестно"),
"created_at": row["created_at"],
"last_login": row["last_login"],
}
# Elder и Creator видят больше информации
if current_user["role"] >= ROLE_ELDER:
user_data["is_active"] = row["is_active"]
# Получаем информацию о проходке
pass_info = conn.execute("""
SELECT p.code, p.expires_at, up.activated_at
FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
LIMIT 1
""", (row["id"], time.time())).fetchone()
if pass_info:
user_data["has_pass"] = True
user_data["pass_expires"] = pass_info["expires_at"]
users.append(user_data)
return {"users": users, "total": len(users)}
@router.get("/users/{user_id}")
async def get_user_detail(
user_id: int,
current_user: dict = Depends(require_role(ROLE_MODERATOR))
):
"""Детальная информация о пользователе"""
with get_db() as conn:
row = conn.execute("""
SELECT id, username, email, uuid, role, created_at, last_login, is_active, banned_until
FROM users WHERE id = ?
""", (user_id,)).fetchone()
if not row:
raise HTTPException(404, "Пользователь не найден")
# Модераторы не видят email обычных пользователей
if current_user["role"] < ROLE_ELDER and row["role"] < ROLE_MODERATOR:
email = None
else:
email = row["email"]
# Получаем активную проходку
pass_info = None
if row["role"] >= ROLE_PASS_HOLDER or current_user["role"] >= ROLE_ELDER:
pass_row = conn.execute("""
SELECT p.code, p.expires_at, up.activated_at
FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
LIMIT 1
""", (user_id, time.time())).fetchone()
if pass_row:
pass_info = {
"code": pass_row["code"][:8] + "..." if current_user["role"] < ROLE_ELDER else pass_row["code"],
"expires_at": pass_row["expires_at"],
"activated_at": pass_row["activated_at"]
}
# Логи действий (только для Elder+)
actions = []
if current_user["role"] >= ROLE_ELDER:
action_rows = conn.execute("""
SELECT action, details, timestamp FROM audit_log
WHERE user_id = ? ORDER BY timestamp DESC LIMIT 20
""", (user_id,)).fetchall()
actions = [dict(row) for row in action_rows]
return {
"id": row["id"],
"username": row["username"],
"email": email,
"uuid": row["uuid"],
"role": row["role"],
"role_name": ROLE_NAMES.get(row["role"], "Неизвестно"),
"created_at": row["created_at"],
"last_login": row["last_login"],
"is_active": row["is_active"],
"banned_until": row["banned_until"],
"has_pass": pass_info is not None,
"pass_info": pass_info,
"recent_actions": actions if current_user["role"] >= ROLE_ELDER else None
}
@router.put("/users/{user_id}/role")
async def update_user_role(
user_id: int,
body: UpdateRoleRequest,
current_user: dict = Depends(require_role(ROLE_ELDER)),
request: Request = None
):
"""Изменение роли пользователя"""
ip = request.client.host if request.client else "unknown"
with get_db() as conn:
target = conn.execute(
"SELECT id, username, role FROM users WHERE id = ?",
(user_id,)
).fetchone()
if not target:
raise HTTPException(404, "Пользователь не найден")
# Проверки прав
if target["role"] == ROLE_CREATOR and current_user["role"] != ROLE_CREATOR:
raise HTTPException(403, "Нельзя изменить роль создателя")
if target["role"] >= current_user["role"] and current_user["role"] != ROLE_CREATOR:
raise HTTPException(403, "Нельзя изменять роль пользователя с равным или высшим уровнем")
if body.role > current_user["role"] and current_user["role"] != ROLE_CREATOR:
raise HTTPException(403, f"Нельзя назначить роль выше своей ({ROLE_NAMES[current_user['role']]})")
# Elder не может создавать других Elder (только Creator)
if body.role == ROLE_ELDER and current_user["role"] != ROLE_CREATOR:
raise HTTPException(403, "Только создатель может назначать Elder Moderator")
# Проверяем, нужно ли выдать/отозвать проходку
old_role = target["role"]
new_role = body.role
conn.execute(
"UPDATE users SET role = ? WHERE id = ?",
(new_role, user_id)
)
# Управление проходками при изменении роли
now = time.time()
if new_role >= ROLE_PASS_HOLDER and old_role < ROLE_PASS_HOLDER:
# Выдаем проходку если её нет
existing = conn.execute("""
SELECT 1 FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
""", (user_id, now)).fetchone()
if not existing:
# Создаем автоматическую проходку
pass_code = f"AUTO_{secrets.token_hex(8).upper()}"
conn.execute("""
INSERT INTO passes (code, owner, expires_at, max_uses, is_active)
VALUES (?, ?, NULL, 1, 1)
""", (pass_code, target["username"]))
conn.execute("""
INSERT INTO user_passes (user_id, pass_code, activated_at)
VALUES (?, ?, ?)
""", (user_id, pass_code, now))
logger.info("Auto-pass issued", user=target["username"], role=new_role)
elif new_role < ROLE_PASS_HOLDER and old_role >= ROLE_PASS_HOLDER:
# Отзываем проходку
conn.execute("""
UPDATE passes SET is_active = 0
WHERE code IN (SELECT pass_code FROM user_passes WHERE user_id = ?)
""", (user_id,))
logger.info("Auto-pass revoked", user=target["username"])
conn.commit()
log_audit(
current_user["id"],
"role_change",
f"Changed role of {target['username']} from {old_role} to {new_role}",
ip
)
logger.info("Role updated", admin=current_user["username"], target=target["username"], new_role=new_role)
return {
"success": True,
"user_id": user_id,
"username": target["username"],
"old_role": old_role,
"old_role_name": ROLE_NAMES.get(old_role, "Неизвестно"),
"new_role": new_role,
"new_role_name": ROLE_NAMES.get(new_role, "Неизвестно")
}
@router.post("/pass/grant")
async def grant_pass(
body: CreatePassDirectRequest,
current_user: dict = Depends(require_role(ROLE_ELDER)),
request: Request = None
):
"""Выдача проходки пользователю (Elder+ могут выдавать)"""
ip = request.client.host if request.client else "unknown"
# Проверяем право на прямую выдачу
if current_user["role"] < ROLE_CREATOR and not has_permission(current_user["role"], Permissions.APPROVE_PASS):
raise HTTPException(403, "Недостаточно прав для выдачи проходки")
with get_db() as conn:
target = conn.execute(
"SELECT id, username, role FROM users WHERE username = ? COLLATE NOCASE",
(body.username,)
).fetchone()
if not target:
raise HTTPException(404, f"Пользователь {body.username} не найден")
# Проверяем, есть ли уже активная проходка
existing = conn.execute("""
SELECT p.code FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
""", (target["id"], time.time())).fetchone()
if existing:
raise HTTPException(409, f"У пользователя {body.username} уже есть активная проходка")
# Создаем проходку
pass_code = secrets.token_hex(12).upper()
now = time.time()
expires_at = now + (body.expires_days * 86400) if body.expires_days else None
conn.execute("""
INSERT INTO passes (code, owner, expires_at, max_uses, is_active)
VALUES (?, ?, ?, ?, 1)
""", (pass_code, target["username"], expires_at, body.max_uses))
conn.execute("""
INSERT INTO user_passes (user_id, pass_code, activated_at)
VALUES (?, ?, ?)
""", (target["id"], pass_code, now))
# Обновляем роль если нужно
if target["role"] < ROLE_PASS_HOLDER:
conn.execute(
"UPDATE users SET role = ? WHERE id = ?",
(ROLE_PASS_HOLDER, target["id"])
)
conn.commit()
log_audit(
current_user["id"],
"grant_pass",
f"Granted pass to {target['username']} (expires: {body.expires_days}d, max_uses: {body.max_uses})",
ip
)
logger.info("Pass granted", admin=current_user["username"], target=target["username"], code=pass_code)
return {
"success": True,
"pass_code": pass_code,
"username": target["username"],
"expires_at": expires_at,
"expires_days": body.expires_days,
"max_uses": body.max_uses
}
@router.delete("/pass/revoke/{username}")
async def revoke_pass(
username: str,
current_user: dict = Depends(require_role(ROLE_ELDER)),
request: Request = None
):
"""Отзыв проходки у пользователя"""
ip = request.client.host if request.client else "unknown"
with get_db() as conn:
target = conn.execute(
"SELECT id, username, role FROM users WHERE username = ? COLLATE NOCASE",
(username,)
).fetchone()
if not target:
raise HTTPException(404, f"Пользователь {username} не найден")
# Отзываем проходку
conn.execute("""
UPDATE passes SET is_active = 0
WHERE code IN (SELECT pass_code FROM user_passes WHERE user_id = ?)
""", (target["id"],))
# Понижаем роль если она была только из-за проходки
if target["role"] == ROLE_PASS_HOLDER:
conn.execute(
"UPDATE users SET role = ? WHERE id = ?",
(ROLE_USER, target["id"])
)
conn.commit()
log_audit(current_user["id"], "revoke_pass", f"Revoked pass from {username}", ip)
logger.info("Pass revoked", admin=current_user["username"], target=username)
return {"success": True, "message": f"Проходка {username} отозвана"}
@router.post("/user/ban")
async def ban_user(
body: BanUserRequest,
current_user: dict = Depends(require_role(ROLE_ELDER)),
request: Request = None
):
"""Бан пользователя (Elder+ могут банить)"""
ip = request.client.host if request.client else "unknown"
with get_db() as conn:
target = conn.execute(
"SELECT id, username, role FROM users WHERE id = ?",
(body.user_id,)
).fetchone()
if not target:
raise HTTPException(404, "Пользователь не найден")
# Нельзя забанить создателя
if target["role"] == ROLE_CREATOR:
raise HTTPException(403, "Нельзя забанить создателя")
# Elder не может банить других Elder
if target["role"] >= ROLE_ELDER and current_user["role"] != ROLE_CREATOR:
raise HTTPException(403, "Недостаточно прав для бана этого пользователя")
banned_until = time.time() + (body.days * 86400)
conn.execute(
"UPDATE users SET is_active = 0, banned_until = ? WHERE id = ?",
(banned_until, target["id"])
)
# Отзываем проходку при бане
conn.execute("""
UPDATE passes SET is_active = 0
WHERE code IN (SELECT pass_code FROM user_passes WHERE user_id = ?)
""", (target["id"],))
conn.commit()
log_audit(
current_user["id"],
"ban_user",
f"Banned {target['username']} for {body.days} days. Reason: {body.reason}",
ip
)
logger.info("User banned", admin=current_user["username"], target=target["username"], days=body.days)
return {
"success": True,
"username": target["username"],
"banned_until": banned_until,
"days": body.days
}
@router.post("/user/unban/{user_id}")
async def unban_user(
user_id: int,
current_user: dict = Depends(require_role(ROLE_ELDER)),
request: Request = None
):
"""Разбан пользователя"""
ip = request.client.host if request.client else "unknown"
with get_db() as conn:
target = conn.execute(
"SELECT id, username FROM users WHERE id = ?",
(user_id,)
).fetchone()
if not target:
raise HTTPException(404, "Пользователь не найден")
conn.execute(
"UPDATE users SET is_active = 1, banned_until = NULL WHERE id = ?",
(user_id,)
)
conn.commit()
log_audit(current_user["id"], "unban_user", f"Unbanned {target['username']}", ip)
logger.info("User unbanned", admin=current_user["username"], target=target["username"])
return {"success": True, "username": target["username"]}
@router.get("/audit")
async def get_audit_log(
current_user: dict = Depends(require_role(ROLE_ELDER)),
limit: int = 50,
offset: int = 0,
user_id: Optional[int] = None
):
"""Просмотр логов аудита (только Elder+)"""
with get_db() as conn:
query = """
SELECT al.*, u.username
FROM audit_log al
LEFT JOIN users u ON al.user_id = u.id
"""
params = []
if user_id:
query += " WHERE al.user_id = ?"
params.append(user_id)
query += " ORDER BY al.timestamp DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(query, params).fetchall()
total = conn.execute("SELECT COUNT(*) as count FROM audit_log").fetchone()["count"]
return {
"logs": [dict(row) for row in rows],
"total": total,
"limit": limit,
"offset": offset
}
@router.get("/stats")
async def get_admin_stats(
current_user: dict = Depends(require_role(ROLE_MODERATOR))
):
"""Статистика для админов"""
with get_db() as conn:
# Общая статистика
total_users = conn.execute("SELECT COUNT(*) as count FROM users").fetchone()["count"]
# Статистика по ролям
role_stats = conn.execute("""
SELECT role, COUNT(*) as count
FROM users
GROUP BY role
ORDER BY role DESC
""").fetchall()
# Активные проходки
active_passes = conn.execute("""
SELECT COUNT(*) as count FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE p.expires_at IS NULL OR p.expires_at > ?
""", (time.time(),)).fetchone()["count"]
# Забаненные пользователи
banned_users = conn.execute("""
SELECT COUNT(*) as count FROM users
WHERE is_active = 0 AND banned_until > ?
""", (time.time(),)).fetchone()["count"]
# Недавние регистрации (последние 7 дней)
week_ago = time.time() - (7 * 86400)
recent_registrations = conn.execute("""
SELECT COUNT(*) as count FROM users WHERE created_at > ?
""", (week_ago,)).fetchone()["count"]
return {
"total_users": total_users,
"active_passes": active_passes,
"banned_users": banned_users,
"recent_registrations_7d": recent_registrations,
"roles_distribution": [
{"role": r["role"], "role_name": ROLE_NAMES.get(r["role"], "Неизвестно"), "count": r["count"]}
for r in role_stats
],
"my_info": {
"role": current_user["role"],
"role_name": ROLE_NAMES.get(current_user["role"], "Неизвестно"),
"username": current_user["username"]
}
}
@router.get("/me")
async def get_my_info(current_user: dict = Depends(get_current_user)):
"""Информация о текущем пользователе с правами"""
with get_db() as conn:
row = conn.execute("""
SELECT id, username, uuid, role, created_at, last_login
FROM users WHERE id = ?
""", (current_user["id"],)).fetchone()
# Проверяем наличие активной проходки
has_pass = False
if row["role"] >= ROLE_PASS_HOLDER:
pass_row = conn.execute("""
SELECT 1 FROM user_passes up
JOIN passes p ON up.pass_code = p.code
WHERE up.user_id = ? AND (p.expires_at IS NULL OR p.expires_at > ?)
""", (current_user["id"], time.time())).fetchone()
has_pass = pass_row is not None
permissions = list(ROLE_PERMISSIONS.get(row["role"], set()))
return {
"id": row["id"],
"username": row["username"],
"uuid": row["uuid"],
"role": row["role"],
"role_name": ROLE_NAMES.get(row["role"], "Неизвестно"),
"created_at": row["created_at"],
"last_login": row["last_login"],
"has_pass": has_pass,
"permissions": permissions
}